xref: /redis-3.2.3/src/server.c (revision cfc08b65)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35 
36 #include <time.h>
37 #include <signal.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <assert.h>
41 #include <ctype.h>
42 #include <stdarg.h>
43 #include <arpa/inet.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #include <sys/uio.h>
49 #include <sys/un.h>
50 #include <limits.h>
51 #include <float.h>
52 #include <math.h>
53 #include <sys/resource.h>
54 #include <sys/utsname.h>
55 #include <locale.h>
56 #include <sys/socket.h>
57 
58 /* Our shared "common" objects */
59 
60 struct sharedObjectsStruct shared;
61 
62 /* Global vars that are actually used as constants. The following double
63  * values are used for double on-disk serialization, and are initialized
64  * at runtime to avoid strange compiler optimizations. */
65 
66 double R_Zero, R_PosInf, R_NegInf, R_Nan;
67 
68 /*================================= Globals ================================= */
69 
70 /* Global vars */
71 struct redisServer server; /* server global state */
72 
73 /* Our command table.
74  *
75  * Every entry is composed of the following fields:
76  *
77  * name: a string representing the command name.
78  * function: pointer to the C function implementing the command.
79  * arity: number of arguments, it is possible to use -N to say >= N
80  * sflags: command flags as string. See below for a table of flags.
81  * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
82  * get_keys_proc: an optional function to get key arguments from a command.
83  *                This is only used when the following three fields are not
84  *                enough to specify what arguments are keys.
85  * first_key_index: first argument that is a key
86  * last_key_index: last argument that is a key
87  * key_step: step to get all the keys from first to last argument. For instance
88  *           in MSET the step is two since arguments are key,val,key,val,...
89  * microseconds: microseconds of total execution time for this command.
90  * calls: total number of calls of this command.
91  *
92  * The flags, microseconds and calls fields are computed by Redis and should
93  * always be set to zero.
94  *
95  * Command flags are expressed using strings where every character represents
96  * a flag. Later the populateCommandTable() function will take care of
97  * populating the real 'flags' field using this characters.
98  *
99  * This is the meaning of the flags:
100  *
101  * w: write command (may modify the key space).
102  * r: read command  (will never modify the key space).
103  * m: may increase memory usage once called. Don't allow if out of memory.
104  * a: admin command, like SAVE or SHUTDOWN.
105  * p: Pub/Sub related command.
106  * f: force replication of this command, regardless of server.dirty.
107  * s: command not allowed in scripts.
108  * R: random command. Command is not deterministic, that is, the same command
109  *    with the same arguments, with the same key space, may have different
110  *    results. For instance SPOP and RANDOMKEY are two random commands.
111  * S: Sort command output array if called from script, so that the output
112  *    is deterministic.
113  * l: Allow command while loading the database.
114  * t: Allow command while a slave has stale data but is not allowed to
115  *    server this data. Normally no command is accepted in this condition
116  *    but just a few.
117  * M: Do not automatically propagate the command on MONITOR.
118  * k: Perform an implicit ASKING for this command, so the command will be
119  *    accepted in cluster mode if the slot is marked as 'importing'.
120  * F: Fast command: O(1) or O(log(N)) command that should never delay
121  *    its execution as long as the kernel scheduler is giving us time.
122  *    Note that commands that may trigger a DEL as a side effect (like SET)
123  *    are not fast commands.
124  */
125 struct redisCommand redisCommandTable[] = {
126     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
127     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
128     {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
129     {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
130     {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
131     {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
132     {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
133     {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
134     {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
135     {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
136     {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
137     {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
138     {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
139     {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
140     {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
141     {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
142     {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
143     {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
144     {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
145     {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
146     {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
147     {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
148     {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
149     {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
150     {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
151     {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
152     {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
153     {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
154     {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
155     {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
156     {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
157     {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
158     {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
159     {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
160     {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
161     {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
162     {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
163     {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
164     {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
165     {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
166     {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},
167     {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
168     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
169     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
170     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
171     {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
172     {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
173     {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
174     {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
175     {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
176     {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
177     {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
178     {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
179     {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
180     {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
181     {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
182     {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
183     {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
184     {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
185     {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
186     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
187     {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
188     {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
189     {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
190     {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
191     {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
192     {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
193     {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
194     {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
195     {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
196     {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
197     {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
198     {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
199     {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
200     {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
201     {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
202     {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
203     {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
204     {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
205     {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
206     {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
207     {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
208     {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
209     {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
210     {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
211     {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
212     {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
213     {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
214     {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
215     {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
216     {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
217     {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
218     {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
219     {"select",selectCommand,2,"rlF",0,NULL,0,0,0,0,0},
220     {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
221     {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
222     {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
223     {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
224     {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
225     {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
226     {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
227     {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
228     {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
229     {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
230     {"auth",authCommand,2,"rsltF",0,NULL,0,0,0,0,0},
231     {"ping",pingCommand,-1,"rtF",0,NULL,0,0,0,0,0},
232     {"echo",echoCommand,2,"rF",0,NULL,0,0,0,0,0},
233     {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},
234     {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0},
235     {"bgrewriteaof",bgrewriteaofCommand,1,"ar",0,NULL,0,0,0,0,0},
236     {"shutdown",shutdownCommand,-1,"arlt",0,NULL,0,0,0,0,0},
237     {"lastsave",lastsaveCommand,1,"rRF",0,NULL,0,0,0,0,0},
238     {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
239     {"multi",multiCommand,1,"rsF",0,NULL,0,0,0,0,0},
240     {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
241     {"discard",discardCommand,1,"rsF",0,NULL,0,0,0,0,0},
242     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
243     {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
244     {"replconf",replconfCommand,-1,"arslt",0,NULL,0,0,0,0,0},
245     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
246     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
247     {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
248     {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
249     {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
250     {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
251     {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
252     {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
253     {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
254     {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
255     {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
256     {"config",configCommand,-2,"art",0,NULL,0,0,0,0,0},
257     {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
258     {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
259     {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
260     {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
261     {"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0},
262     {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
263     {"watch",watchCommand,-2,"rsF",0,NULL,1,-1,1,0,0},
264     {"unwatch",unwatchCommand,1,"rsF",0,NULL,0,0,0,0,0},
265     {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
266     {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
267     {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
268     {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},
269     {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
270     {"readonly",readonlyCommand,1,"rF",0,NULL,0,0,0,0,0},
271     {"readwrite",readwriteCommand,1,"rF",0,NULL,0,0,0,0,0},
272     {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
273     {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
274     {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
275     {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
276     {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
277     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
278     {"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0},
279     {"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0},
280     {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
281     {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
282     {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
283     {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},
284     {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0},
285     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
286     {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0},
287     {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0},
288     {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
289     {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
290     {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
291     {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
292     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
293     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
294     {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
295     {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
296     {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0}
297 };
298 
299 struct evictionPoolEntry *evictionPoolAlloc(void);
300 
301 /*============================ Utility functions ============================ */
302 
303 /* Low level logging. To use only for very big messages, otherwise
304  * serverLog() is to prefer. */
305 void serverLogRaw(int level, const char *msg) {
306     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
307     const char *c = ".-*#";
308     FILE *fp;
309     char buf[64];
310     int rawmode = (level & LL_RAW);
311     int log_to_stdout = server.logfile[0] == '\0';
312 
313     level &= 0xff; /* clear flags */
314     if (level < server.verbosity) return;
315 
316     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
317     if (!fp) return;
318 
319     if (rawmode) {
320         fprintf(fp,"%s",msg);
321     } else {
322         int off;
323         struct timeval tv;
324         int role_char;
325         pid_t pid = getpid();
326 
327         gettimeofday(&tv,NULL);
328         off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec));
329         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
330         if (server.sentinel_mode) {
331             role_char = 'X'; /* Sentinel. */
332         } else if (pid != server.pid) {
333             role_char = 'C'; /* RDB / AOF writing child. */
334         } else {
335             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
336         }
337         fprintf(fp,"%d:%c %s %c %s\n",
338             (int)getpid(),role_char, buf,c[level],msg);
339     }
340     fflush(fp);
341 
342     if (!log_to_stdout) fclose(fp);
343     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
344 }
345 
346 /* Like serverLogRaw() but with printf-alike support. This is the function that
347  * is used across the code. The raw version is only used in order to dump
348  * the INFO output on crash. */
349 void serverLog(int level, const char *fmt, ...) {
350     va_list ap;
351     char msg[LOG_MAX_LEN];
352 
353     if ((level&0xff) < server.verbosity) return;
354 
355     va_start(ap, fmt);
356     vsnprintf(msg, sizeof(msg), fmt, ap);
357     va_end(ap);
358 
359     serverLogRaw(level,msg);
360 }
361 
362 /* Log a fixed message without printf-alike capabilities, in a way that is
363  * safe to call from a signal handler.
364  *
365  * We actually use this only for signals that are not fatal from the point
366  * of view of Redis. Signals that are going to kill the server anyway and
367  * where we need printf-alike features are served by serverLog(). */
368 void serverLogFromHandler(int level, const char *msg) {
369     int fd;
370     int log_to_stdout = server.logfile[0] == '\0';
371     char buf[64];
372 
373     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
374         return;
375     fd = log_to_stdout ? STDOUT_FILENO :
376                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
377     if (fd == -1) return;
378     ll2string(buf,sizeof(buf),getpid());
379     if (write(fd,buf,strlen(buf)) == -1) goto err;
380     if (write(fd,":signal-handler (",17) == -1) goto err;
381     ll2string(buf,sizeof(buf),time(NULL));
382     if (write(fd,buf,strlen(buf)) == -1) goto err;
383     if (write(fd,") ",2) == -1) goto err;
384     if (write(fd,msg,strlen(msg)) == -1) goto err;
385     if (write(fd,"\n",1) == -1) goto err;
386 err:
387     if (!log_to_stdout) close(fd);
388 }
389 
390 /* Return the UNIX time in microseconds */
391 long long ustime(void) {
392     struct timeval tv;
393     long long ust;
394 
395     gettimeofday(&tv, NULL);
396     ust = ((long long)tv.tv_sec)*1000000;
397     ust += tv.tv_usec;
398     return ust;
399 }
400 
401 /* Return the UNIX time in milliseconds */
402 mstime_t mstime(void) {
403     return ustime()/1000;
404 }
405 
406 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
407  * exit(), because the latter may interact with the same file objects used by
408  * the parent process. However if we are testing the coverage normal exit() is
409  * used in order to obtain the right coverage information. */
410 void exitFromChild(int retcode) {
411 #ifdef COVERAGE_TEST
412     exit(retcode);
413 #else
414     _exit(retcode);
415 #endif
416 }
417 
418 /*====================== Hash table type implementation  ==================== */
419 
420 /* This is a hash table type that uses the SDS dynamic strings library as
421  * keys and redis objects as values (objects can hold SDS strings,
422  * lists, sets). */
423 
424 void dictVanillaFree(void *privdata, void *val)
425 {
426     DICT_NOTUSED(privdata);
427     zfree(val);
428 }
429 
430 void dictListDestructor(void *privdata, void *val)
431 {
432     DICT_NOTUSED(privdata);
433     listRelease((list*)val);
434 }
435 
436 int dictSdsKeyCompare(void *privdata, const void *key1,
437         const void *key2)
438 {
439     int l1,l2;
440     DICT_NOTUSED(privdata);
441 
442     l1 = sdslen((sds)key1);
443     l2 = sdslen((sds)key2);
444     if (l1 != l2) return 0;
445     return memcmp(key1, key2, l1) == 0;
446 }
447 
448 /* A case insensitive version used for the command lookup table and other
449  * places where case insensitive non binary-safe comparison is needed. */
450 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
451         const void *key2)
452 {
453     DICT_NOTUSED(privdata);
454 
455     return strcasecmp(key1, key2) == 0;
456 }
457 
458 void dictObjectDestructor(void *privdata, void *val)
459 {
460     DICT_NOTUSED(privdata);
461 
462     if (val == NULL) return; /* Values of swapped out keys as set to NULL */
463     decrRefCount(val);
464 }
465 
466 void dictSdsDestructor(void *privdata, void *val)
467 {
468     DICT_NOTUSED(privdata);
469 
470     sdsfree(val);
471 }
472 
473 int dictObjKeyCompare(void *privdata, const void *key1,
474         const void *key2)
475 {
476     const robj *o1 = key1, *o2 = key2;
477     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
478 }
479 
480 unsigned int dictObjHash(const void *key) {
481     const robj *o = key;
482     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
483 }
484 
485 unsigned int dictSdsHash(const void *key) {
486     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
487 }
488 
489 unsigned int dictSdsCaseHash(const void *key) {
490     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
491 }
492 
493 int dictEncObjKeyCompare(void *privdata, const void *key1,
494         const void *key2)
495 {
496     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
497     int cmp;
498 
499     if (o1->encoding == OBJ_ENCODING_INT &&
500         o2->encoding == OBJ_ENCODING_INT)
501             return o1->ptr == o2->ptr;
502 
503     o1 = getDecodedObject(o1);
504     o2 = getDecodedObject(o2);
505     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
506     decrRefCount(o1);
507     decrRefCount(o2);
508     return cmp;
509 }
510 
511 unsigned int dictEncObjHash(const void *key) {
512     robj *o = (robj*) key;
513 
514     if (sdsEncodedObject(o)) {
515         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
516     } else {
517         if (o->encoding == OBJ_ENCODING_INT) {
518             char buf[32];
519             int len;
520 
521             len = ll2string(buf,32,(long)o->ptr);
522             return dictGenHashFunction((unsigned char*)buf, len);
523         } else {
524             unsigned int hash;
525 
526             o = getDecodedObject(o);
527             hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
528             decrRefCount(o);
529             return hash;
530         }
531     }
532 }
533 
534 /* Sets type hash table */
535 dictType setDictType = {
536     dictEncObjHash,            /* hash function */
537     NULL,                      /* key dup */
538     NULL,                      /* val dup */
539     dictEncObjKeyCompare,      /* key compare */
540     dictObjectDestructor, /* key destructor */
541     NULL                       /* val destructor */
542 };
543 
544 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
545 dictType zsetDictType = {
546     dictEncObjHash,            /* hash function */
547     NULL,                      /* key dup */
548     NULL,                      /* val dup */
549     dictEncObjKeyCompare,      /* key compare */
550     dictObjectDestructor, /* key destructor */
551     NULL                       /* val destructor */
552 };
553 
554 /* Db->dict, keys are sds strings, vals are Redis objects. */
555 dictType dbDictType = {
556     dictSdsHash,                /* hash function */
557     NULL,                       /* key dup */
558     NULL,                       /* val dup */
559     dictSdsKeyCompare,          /* key compare */
560     dictSdsDestructor,          /* key destructor */
561     dictObjectDestructor   /* val destructor */
562 };
563 
564 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
565 dictType shaScriptObjectDictType = {
566     dictSdsCaseHash,            /* hash function */
567     NULL,                       /* key dup */
568     NULL,                       /* val dup */
569     dictSdsKeyCaseCompare,      /* key compare */
570     dictSdsDestructor,          /* key destructor */
571     dictObjectDestructor   /* val destructor */
572 };
573 
574 /* Db->expires */
575 dictType keyptrDictType = {
576     dictSdsHash,               /* hash function */
577     NULL,                      /* key dup */
578     NULL,                      /* val dup */
579     dictSdsKeyCompare,         /* key compare */
580     NULL,                      /* key destructor */
581     NULL                       /* val destructor */
582 };
583 
584 /* Command table. sds string -> command struct pointer. */
585 dictType commandTableDictType = {
586     dictSdsCaseHash,           /* hash function */
587     NULL,                      /* key dup */
588     NULL,                      /* val dup */
589     dictSdsKeyCaseCompare,     /* key compare */
590     dictSdsDestructor,         /* key destructor */
591     NULL                       /* val destructor */
592 };
593 
594 /* Hash type hash table (note that small hashes are represented with ziplists) */
595 dictType hashDictType = {
596     dictEncObjHash,             /* hash function */
597     NULL,                       /* key dup */
598     NULL,                       /* val dup */
599     dictEncObjKeyCompare,       /* key compare */
600     dictObjectDestructor,  /* key destructor */
601     dictObjectDestructor   /* val destructor */
602 };
603 
604 /* Keylist hash table type has unencoded redis objects as keys and
605  * lists as values. It's used for blocking operations (BLPOP) and to
606  * map swapped keys to a list of clients waiting for this keys to be loaded. */
607 dictType keylistDictType = {
608     dictObjHash,                /* hash function */
609     NULL,                       /* key dup */
610     NULL,                       /* val dup */
611     dictObjKeyCompare,          /* key compare */
612     dictObjectDestructor,  /* key destructor */
613     dictListDestructor          /* val destructor */
614 };
615 
616 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
617  * clusterNode structures. */
618 dictType clusterNodesDictType = {
619     dictSdsHash,                /* hash function */
620     NULL,                       /* key dup */
621     NULL,                       /* val dup */
622     dictSdsKeyCompare,          /* key compare */
623     dictSdsDestructor,          /* key destructor */
624     NULL                        /* val destructor */
625 };
626 
627 /* Cluster re-addition blacklist. This maps node IDs to the time
628  * we can re-add this node. The goal is to avoid readding a removed
629  * node for some time. */
630 dictType clusterNodesBlackListDictType = {
631     dictSdsCaseHash,            /* hash function */
632     NULL,                       /* key dup */
633     NULL,                       /* val dup */
634     dictSdsKeyCaseCompare,      /* key compare */
635     dictSdsDestructor,          /* key destructor */
636     NULL                        /* val destructor */
637 };
638 
639 /* Migrate cache dict type. */
640 dictType migrateCacheDictType = {
641     dictSdsHash,                /* hash function */
642     NULL,                       /* key dup */
643     NULL,                       /* val dup */
644     dictSdsKeyCompare,          /* key compare */
645     dictSdsDestructor,          /* key destructor */
646     NULL                        /* val destructor */
647 };
648 
649 /* Replication cached script dict (server.repl_scriptcache_dict).
650  * Keys are sds SHA1 strings, while values are not used at all in the current
651  * implementation. */
652 dictType replScriptCacheDictType = {
653     dictSdsCaseHash,            /* hash function */
654     NULL,                       /* key dup */
655     NULL,                       /* val dup */
656     dictSdsKeyCaseCompare,      /* key compare */
657     dictSdsDestructor,          /* key destructor */
658     NULL                        /* val destructor */
659 };
660 
661 int htNeedsResize(dict *dict) {
662     long long size, used;
663 
664     size = dictSlots(dict);
665     used = dictSize(dict);
666     return (size && used && size > DICT_HT_INITIAL_SIZE &&
667             (used*100/size < HASHTABLE_MIN_FILL));
668 }
669 
670 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
671  * we resize the hash table to save memory */
672 void tryResizeHashTables(int dbid) {
673     if (htNeedsResize(server.db[dbid].dict))
674         dictResize(server.db[dbid].dict);
675     if (htNeedsResize(server.db[dbid].expires))
676         dictResize(server.db[dbid].expires);
677 }
678 
679 /* Our hash table implementation performs rehashing incrementally while
680  * we write/read from the hash table. Still if the server is idle, the hash
681  * table will use two tables for a long time. So we try to use 1 millisecond
682  * of CPU time at every call of this function to perform some rehahsing.
683  *
684  * The function returns 1 if some rehashing was performed, otherwise 0
685  * is returned. */
686 int incrementallyRehash(int dbid) {
687     /* Keys dictionary */
688     if (dictIsRehashing(server.db[dbid].dict)) {
689         dictRehashMilliseconds(server.db[dbid].dict,1);
690         return 1; /* already used our millisecond for this loop... */
691     }
692     /* Expires */
693     if (dictIsRehashing(server.db[dbid].expires)) {
694         dictRehashMilliseconds(server.db[dbid].expires,1);
695         return 1; /* already used our millisecond for this loop... */
696     }
697     return 0;
698 }
699 
700 /* This function is called once a background process of some kind terminates,
701  * as we want to avoid resizing the hash tables when there is a child in order
702  * to play well with copy-on-write (otherwise when a resize happens lots of
703  * memory pages are copied). The goal of this function is to update the ability
704  * for dict.c to resize the hash tables accordingly to the fact we have o not
705  * running childs. */
706 void updateDictResizePolicy(void) {
707     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
708         dictEnableResize();
709     else
710         dictDisableResize();
711 }
712 
713 /* ======================= Cron: called every 100 ms ======================== */
714 
715 /* Helper function for the activeExpireCycle() function.
716  * This function will try to expire the key that is stored in the hash table
717  * entry 'de' of the 'expires' hash table of a Redis database.
718  *
719  * If the key is found to be expired, it is removed from the database and
720  * 1 is returned. Otherwise no operation is performed and 0 is returned.
721  *
722  * When a key is expired, server.stat_expiredkeys is incremented.
723  *
724  * The parameter 'now' is the current time in milliseconds as is passed
725  * to the function to avoid too many gettimeofday() syscalls. */
726 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
727     long long t = dictGetSignedIntegerVal(de);
728     if (now > t) {
729         sds key = dictGetKey(de);
730         robj *keyobj = createStringObject(key,sdslen(key));
731 
732         propagateExpire(db,keyobj);
733         dbDelete(db,keyobj);
734         notifyKeyspaceEvent(NOTIFY_EXPIRED,
735             "expired",keyobj,db->id);
736         decrRefCount(keyobj);
737         server.stat_expiredkeys++;
738         return 1;
739     } else {
740         return 0;
741     }
742 }
743 
744 /* Try to expire a few timed out keys. The algorithm used is adaptive and
745  * will use few CPU cycles if there are few expiring keys, otherwise
746  * it will get more aggressive to avoid that too much memory is used by
747  * keys that can be removed from the keyspace.
748  *
749  * No more than CRON_DBS_PER_CALL databases are tested at every
750  * iteration.
751  *
752  * This kind of call is used when Redis detects that timelimit_exit is
753  * true, so there is more work to do, and we do it more incrementally from
754  * the beforeSleep() function of the event loop.
755  *
756  * Expire cycle type:
757  *
758  * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
759  * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
760  * microseconds, and is not repeated again before the same amount of time.
761  *
762  * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
763  * executed, where the time limit is a percentage of the REDIS_HZ period
764  * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */
765 
766 void activeExpireCycle(int type) {
767     /* This function has some global state in order to continue the work
768      * incrementally across calls. */
769     static unsigned int current_db = 0; /* Last DB tested. */
770     static int timelimit_exit = 0;      /* Time limit hit in previous call? */
771     static long long last_fast_cycle = 0; /* When last fast cycle ran. */
772 
773     int j, iteration = 0;
774     int dbs_per_call = CRON_DBS_PER_CALL;
775     long long start = ustime(), timelimit;
776 
777     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
778         /* Don't start a fast cycle if the previous cycle did not exited
779          * for time limt. Also don't repeat a fast cycle for the same period
780          * as the fast cycle total duration itself. */
781         if (!timelimit_exit) return;
782         if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
783         last_fast_cycle = start;
784     }
785 
786     /* We usually should test CRON_DBS_PER_CALL per iteration, with
787      * two exceptions:
788      *
789      * 1) Don't test more DBs than we have.
790      * 2) If last time we hit the time limit, we want to scan all DBs
791      * in this iteration, as there is work to do in some DB and we don't want
792      * expired keys to use memory for too much time. */
793     if (dbs_per_call > server.dbnum || timelimit_exit)
794         dbs_per_call = server.dbnum;
795 
796     /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
797      * per iteration. Since this function gets called with a frequency of
798      * server.hz times per second, the following is the max amount of
799      * microseconds we can spend in this function. */
800     timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
801     timelimit_exit = 0;
802     if (timelimit <= 0) timelimit = 1;
803 
804     if (type == ACTIVE_EXPIRE_CYCLE_FAST)
805         timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
806 
807     for (j = 0; j < dbs_per_call; j++) {
808         int expired;
809         redisDb *db = server.db+(current_db % server.dbnum);
810 
811         /* Increment the DB now so we are sure if we run out of time
812          * in the current DB we'll restart from the next. This allows to
813          * distribute the time evenly across DBs. */
814         current_db++;
815 
816         /* Continue to expire if at the end of the cycle more than 25%
817          * of the keys were expired. */
818         do {
819             unsigned long num, slots;
820             long long now, ttl_sum;
821             int ttl_samples;
822 
823             /* If there is nothing to expire try next DB ASAP. */
824             if ((num = dictSize(db->expires)) == 0) {
825                 db->avg_ttl = 0;
826                 break;
827             }
828             slots = dictSlots(db->expires);
829             now = mstime();
830 
831             /* When there are less than 1% filled slots getting random
832              * keys is expensive, so stop here waiting for better times...
833              * The dictionary will be resized asap. */
834             if (num && slots > DICT_HT_INITIAL_SIZE &&
835                 (num*100/slots < 1)) break;
836 
837             /* The main collection cycle. Sample random keys among keys
838              * with an expire set, checking for expired ones. */
839             expired = 0;
840             ttl_sum = 0;
841             ttl_samples = 0;
842 
843             if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
844                 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
845 
846             while (num--) {
847                 dictEntry *de;
848                 long long ttl;
849 
850                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
851                 ttl = dictGetSignedIntegerVal(de)-now;
852                 if (activeExpireCycleTryExpire(db,de,now)) expired++;
853                 if (ttl > 0) {
854                     /* We want the average TTL of keys yet not expired. */
855                     ttl_sum += ttl;
856                     ttl_samples++;
857                 }
858             }
859 
860             /* Update the average TTL stats for this database. */
861             if (ttl_samples) {
862                 long long avg_ttl = ttl_sum/ttl_samples;
863 
864                 /* Do a simple running average with a few samples.
865                  * We just use the current estimate with a weight of 2%
866                  * and the previous estimate with a weight of 98%. */
867                 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
868                 db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
869             }
870 
871             /* We can't block forever here even if there are many keys to
872              * expire. So after a given amount of milliseconds return to the
873              * caller waiting for the other active expire cycle. */
874             iteration++;
875             if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
876                 long long elapsed = ustime()-start;
877 
878                 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
879                 if (elapsed > timelimit) timelimit_exit = 1;
880             }
881             if (timelimit_exit) return;
882             /* We don't repeat the cycle if there are less than 25% of keys
883              * found expired in the current DB. */
884         } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
885     }
886 }
887 
888 unsigned int getLRUClock(void) {
889     return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
890 }
891 
892 /* Add a sample to the operations per second array of samples. */
893 void trackInstantaneousMetric(int metric, long long current_reading) {
894     long long t = mstime() - server.inst_metric[metric].last_sample_time;
895     long long ops = current_reading -
896                     server.inst_metric[metric].last_sample_count;
897     long long ops_sec;
898 
899     ops_sec = t > 0 ? (ops*1000/t) : 0;
900 
901     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
902         ops_sec;
903     server.inst_metric[metric].idx++;
904     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
905     server.inst_metric[metric].last_sample_time = mstime();
906     server.inst_metric[metric].last_sample_count = current_reading;
907 }
908 
909 /* Return the mean of all the samples. */
910 long long getInstantaneousMetric(int metric) {
911     int j;
912     long long sum = 0;
913 
914     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
915         sum += server.inst_metric[metric].samples[j];
916     return sum / STATS_METRIC_SAMPLES;
917 }
918 
919 /* Check for timeouts. Returns non-zero if the client was terminated.
920  * The function gets the current time in milliseconds as argument since
921  * it gets called multiple times in a loop, so calling gettimeofday() for
922  * each iteration would be costly without any actual gain. */
923 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
924     time_t now = now_ms/1000;
925 
926     if (server.maxidletime &&
927         !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
928         !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
929         !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
930         !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
931         (now - c->lastinteraction > server.maxidletime))
932     {
933         serverLog(LL_VERBOSE,"Closing idle client");
934         freeClient(c);
935         return 1;
936     } else if (c->flags & CLIENT_BLOCKED) {
937         /* Blocked OPS timeout is handled with milliseconds resolution.
938          * However note that the actual resolution is limited by
939          * server.hz. */
940 
941         if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
942             /* Handle blocking operation specific timeout. */
943             replyToBlockedClientTimedOut(c);
944             unblockClient(c);
945         } else if (server.cluster_enabled) {
946             /* Cluster: handle unblock & redirect of clients blocked
947              * into keys no longer served by this server. */
948             if (clusterRedirectBlockedClientIfNeeded(c))
949                 unblockClient(c);
950         }
951     }
952     return 0;
953 }
954 
955 /* The client query buffer is an sds.c string that can end with a lot of
956  * free space not used, this function reclaims space if needed.
957  *
958  * The function always returns 0 as it never terminates the client. */
959 int clientsCronResizeQueryBuffer(client *c) {
960     size_t querybuf_size = sdsAllocSize(c->querybuf);
961     time_t idletime = server.unixtime - c->lastinteraction;
962 
963     /* There are two conditions to resize the query buffer:
964      * 1) Query buffer is > BIG_ARG and too big for latest peak.
965      * 2) Client is inactive and the buffer is bigger than 1k. */
966     if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
967          (querybuf_size/(c->querybuf_peak+1)) > 2) ||
968          (querybuf_size > 1024 && idletime > 2))
969     {
970         /* Only resize the query buffer if it is actually wasting space. */
971         if (sdsavail(c->querybuf) > 1024) {
972             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
973         }
974     }
975     /* Reset the peak again to capture the peak memory usage in the next
976      * cycle. */
977     c->querybuf_peak = 0;
978     return 0;
979 }
980 
981 #define CLIENTS_CRON_MIN_ITERATIONS 5
982 void clientsCron(void) {
983     /* Make sure to process at least numclients/server.hz of clients
984      * per call. Since this function is called server.hz times per second
985      * we are sure that in the worst case we process all the clients in 1
986      * second. */
987     int numclients = listLength(server.clients);
988     int iterations = numclients/server.hz;
989     mstime_t now = mstime();
990 
991     /* Process at least a few clients while we are at it, even if we need
992      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
993      * of processing each client once per second. */
994     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
995         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
996                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
997 
998     while(listLength(server.clients) && iterations--) {
999         client *c;
1000         listNode *head;
1001 
1002         /* Rotate the list, take the current head, process.
1003          * This way if the client must be removed from the list it's the
1004          * first element and we don't incur into O(N) computation. */
1005         listRotate(server.clients);
1006         head = listFirst(server.clients);
1007         c = listNodeValue(head);
1008         /* The following functions do different service checks on the client.
1009          * The protocol is that they return non-zero if the client was
1010          * terminated. */
1011         if (clientsCronHandleTimeout(c,now)) continue;
1012         if (clientsCronResizeQueryBuffer(c)) continue;
1013     }
1014 }
1015 
1016 /* This function handles 'background' operations we are required to do
1017  * incrementally in Redis databases, such as active key expiring, resizing,
1018  * rehashing. */
1019 void databasesCron(void) {
1020     /* Expire keys by random sampling. Not required for slaves
1021      * as master will synthesize DELs for us. */
1022     if (server.active_expire_enabled && server.masterhost == NULL)
1023         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1024 
1025     /* Perform hash tables rehashing if needed, but only if there are no
1026      * other processes saving the DB on disk. Otherwise rehashing is bad
1027      * as will cause a lot of copy-on-write of memory pages. */
1028     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1029         /* We use global counters so if we stop the computation at a given
1030          * DB we'll be able to start from the successive in the next
1031          * cron loop iteration. */
1032         static unsigned int resize_db = 0;
1033         static unsigned int rehash_db = 0;
1034         int dbs_per_call = CRON_DBS_PER_CALL;
1035         int j;
1036 
1037         /* Don't test more DBs than we have. */
1038         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1039 
1040         /* Resize */
1041         for (j = 0; j < dbs_per_call; j++) {
1042             tryResizeHashTables(resize_db % server.dbnum);
1043             resize_db++;
1044         }
1045 
1046         /* Rehash */
1047         if (server.activerehashing) {
1048             for (j = 0; j < dbs_per_call; j++) {
1049                 int work_done = incrementallyRehash(rehash_db % server.dbnum);
1050                 rehash_db++;
1051                 if (work_done) {
1052                     /* If the function did some work, stop here, we'll do
1053                      * more at the next cron loop. */
1054                     break;
1055                 }
1056             }
1057         }
1058     }
1059 }
1060 
1061 /* We take a cached value of the unix time in the global state because with
1062  * virtual memory and aging there is to store the current time in objects at
1063  * every object access, and accuracy is not needed. To access a global var is
1064  * a lot faster than calling time(NULL) */
1065 void updateCachedTime(void) {
1066     server.unixtime = time(NULL);
1067     server.mstime = mstime();
1068 }
1069 
1070 /* This is our timer interrupt, called server.hz times per second.
1071  * Here is where we do a number of things that need to be done asynchronously.
1072  * For instance:
1073  *
1074  * - Active expired keys collection (it is also performed in a lazy way on
1075  *   lookup).
1076  * - Software watchdog.
1077  * - Update some statistic.
1078  * - Incremental rehashing of the DBs hash tables.
1079  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1080  * - Clients timeout of different kinds.
1081  * - Replication reconnection.
1082  * - Many more...
1083  *
1084  * Everything directly called here will be called server.hz times per second,
1085  * so in order to throttle execution of things we want to do less frequently
1086  * a macro is used: run_with_period(milliseconds) { .... }
1087  */
1088 
1089 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1090     int j;
1091     UNUSED(eventLoop);
1092     UNUSED(id);
1093     UNUSED(clientData);
1094 
1095     /* Software watchdog: deliver the SIGALRM that will reach the signal
1096      * handler if we don't return here fast enough. */
1097     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1098 
1099     /* Update the time cache. */
1100     updateCachedTime();
1101 
1102     run_with_period(100) {
1103         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1104         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1105                 server.stat_net_input_bytes);
1106         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1107                 server.stat_net_output_bytes);
1108     }
1109 
1110     /* We have just LRU_BITS bits per object for LRU information.
1111      * So we use an (eventually wrapping) LRU clock.
1112      *
1113      * Note that even if the counter wraps it's not a big problem,
1114      * everything will still work but some object will appear younger
1115      * to Redis. However for this to happen a given object should never be
1116      * touched for all the time needed to the counter to wrap, which is
1117      * not likely.
1118      *
1119      * Note that you can change the resolution altering the
1120      * LRU_CLOCK_RESOLUTION define. */
1121     server.lruclock = getLRUClock();
1122 
1123     /* Record the max memory used since the server was started. */
1124     if (zmalloc_used_memory() > server.stat_peak_memory)
1125         server.stat_peak_memory = zmalloc_used_memory();
1126 
1127     /* Sample the RSS here since this is a relatively slow call. */
1128     server.resident_set_size = zmalloc_get_rss();
1129 
1130     /* We received a SIGTERM, shutting down here in a safe way, as it is
1131      * not ok doing so inside the signal handler. */
1132     if (server.shutdown_asap) {
1133         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1134         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1135         server.shutdown_asap = 0;
1136     }
1137 
1138     /* Show some info about non-empty databases */
1139     run_with_period(5000) {
1140         for (j = 0; j < server.dbnum; j++) {
1141             long long size, used, vkeys;
1142 
1143             size = dictSlots(server.db[j].dict);
1144             used = dictSize(server.db[j].dict);
1145             vkeys = dictSize(server.db[j].expires);
1146             if (used || vkeys) {
1147                 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1148                 /* dictPrintStats(server.dict); */
1149             }
1150         }
1151     }
1152 
1153     /* Show information about connected clients */
1154     if (!server.sentinel_mode) {
1155         run_with_period(5000) {
1156             serverLog(LL_VERBOSE,
1157                 "%lu clients connected (%lu slaves), %zu bytes in use",
1158                 listLength(server.clients)-listLength(server.slaves),
1159                 listLength(server.slaves),
1160                 zmalloc_used_memory());
1161         }
1162     }
1163 
1164     /* We need to do a few operations on clients asynchronously. */
1165     clientsCron();
1166 
1167     /* Handle background operations on Redis databases. */
1168     databasesCron();
1169 
1170     /* Start a scheduled AOF rewrite if this was requested by the user while
1171      * a BGSAVE was in progress. */
1172     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1173         server.aof_rewrite_scheduled)
1174     {
1175         rewriteAppendOnlyFileBackground();
1176     }
1177 
1178     /* Check if a background saving or AOF rewrite in progress terminated. */
1179     if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1180         ldbPendingChildren())
1181     {
1182         int statloc;
1183         pid_t pid;
1184 
1185         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1186             int exitcode = WEXITSTATUS(statloc);
1187             int bysignal = 0;
1188 
1189             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1190 
1191             if (pid == -1) {
1192                 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1193                     "rdb_child_pid = %d, aof_child_pid = %d",
1194                     strerror(errno),
1195                     (int) server.rdb_child_pid,
1196                     (int) server.aof_child_pid);
1197             } else if (pid == server.rdb_child_pid) {
1198                 backgroundSaveDoneHandler(exitcode,bysignal);
1199             } else if (pid == server.aof_child_pid) {
1200                 backgroundRewriteDoneHandler(exitcode,bysignal);
1201             } else {
1202                 if (!ldbRemoveChild(pid)) {
1203                     serverLog(LL_WARNING,
1204                         "Warning, detected child with unmatched pid: %ld",
1205                         (long)pid);
1206                 }
1207             }
1208             updateDictResizePolicy();
1209         }
1210     } else {
1211         /* If there is not a background saving/rewrite in progress check if
1212          * we have to save/rewrite now */
1213          for (j = 0; j < server.saveparamslen; j++) {
1214             struct saveparam *sp = server.saveparams+j;
1215 
1216             /* Save if we reached the given amount of changes,
1217              * the given amount of seconds, and if the latest bgsave was
1218              * successful or if, in case of an error, at least
1219              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1220             if (server.dirty >= sp->changes &&
1221                 server.unixtime-server.lastsave > sp->seconds &&
1222                 (server.unixtime-server.lastbgsave_try >
1223                  CONFIG_BGSAVE_RETRY_DELAY ||
1224                  server.lastbgsave_status == C_OK))
1225             {
1226                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1227                     sp->changes, (int)sp->seconds);
1228                 rdbSaveBackground(server.rdb_filename);
1229                 break;
1230             }
1231          }
1232 
1233          /* Trigger an AOF rewrite if needed */
1234          if (server.rdb_child_pid == -1 &&
1235              server.aof_child_pid == -1 &&
1236              server.aof_rewrite_perc &&
1237              server.aof_current_size > server.aof_rewrite_min_size)
1238          {
1239             long long base = server.aof_rewrite_base_size ?
1240                             server.aof_rewrite_base_size : 1;
1241             long long growth = (server.aof_current_size*100/base) - 100;
1242             if (growth >= server.aof_rewrite_perc) {
1243                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1244                 rewriteAppendOnlyFileBackground();
1245             }
1246          }
1247     }
1248 
1249 
1250     /* AOF postponed flush: Try at every cron cycle if the slow fsync
1251      * completed. */
1252     if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1253 
1254     /* AOF write errors: in this case we have a buffer to flush as well and
1255      * clear the AOF error in case of success to make the DB writable again,
1256      * however to try every second is enough in case of 'hz' is set to
1257      * an higher frequency. */
1258     run_with_period(1000) {
1259         if (server.aof_last_write_status == C_ERR)
1260             flushAppendOnlyFile(0);
1261     }
1262 
1263     /* Close clients that need to be closed asynchronous */
1264     freeClientsInAsyncFreeQueue();
1265 
1266     /* Clear the paused clients flag if needed. */
1267     clientsArePaused(); /* Don't check return value, just use the side effect. */
1268 
1269     /* Replication cron function -- used to reconnect to master and
1270      * to detect transfer failures. */
1271     run_with_period(1000) replicationCron();
1272 
1273     /* Run the Redis Cluster cron. */
1274     run_with_period(100) {
1275         if (server.cluster_enabled) clusterCron();
1276     }
1277 
1278     /* Run the Sentinel timer if we are in sentinel mode. */
1279     run_with_period(100) {
1280         if (server.sentinel_mode) sentinelTimer();
1281     }
1282 
1283     /* Cleanup expired MIGRATE cached sockets. */
1284     run_with_period(1000) {
1285         migrateCloseTimedoutSockets();
1286     }
1287 
1288     server.cronloops++;
1289     return 1000/server.hz;
1290 }
1291 
1292 /* This function gets called every time Redis is entering the
1293  * main loop of the event driven library, that is, before to sleep
1294  * for ready file descriptors. */
1295 void beforeSleep(struct aeEventLoop *eventLoop) {
1296     UNUSED(eventLoop);
1297 
1298     /* Call the Redis Cluster before sleep function. Note that this function
1299      * may change the state of Redis Cluster (from ok to fail or vice versa),
1300      * so it's a good idea to call it before serving the unblocked clients
1301      * later in this function. */
1302     if (server.cluster_enabled) clusterBeforeSleep();
1303 
1304     /* Run a fast expire cycle (the called function will return
1305      * ASAP if a fast cycle is not needed). */
1306     if (server.active_expire_enabled && server.masterhost == NULL)
1307         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1308 
1309     /* Send all the slaves an ACK request if at least one client blocked
1310      * during the previous event loop iteration. */
1311     if (server.get_ack_from_slaves) {
1312         robj *argv[3];
1313 
1314         argv[0] = createStringObject("REPLCONF",8);
1315         argv[1] = createStringObject("GETACK",6);
1316         argv[2] = createStringObject("*",1); /* Not used argument. */
1317         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1318         decrRefCount(argv[0]);
1319         decrRefCount(argv[1]);
1320         decrRefCount(argv[2]);
1321         server.get_ack_from_slaves = 0;
1322     }
1323 
1324     /* Unblock all the clients blocked for synchronous replication
1325      * in WAIT. */
1326     if (listLength(server.clients_waiting_acks))
1327         processClientsWaitingReplicas();
1328 
1329     /* Try to process pending commands for clients that were just unblocked. */
1330     if (listLength(server.unblocked_clients))
1331         processUnblockedClients();
1332 
1333     /* Write the AOF buffer on disk */
1334     flushAppendOnlyFile(0);
1335 
1336     /* Handle writes with pending output buffers. */
1337     handleClientsWithPendingWrites();
1338 }
1339 
1340 /* =========================== Server initialization ======================== */
1341 
1342 void createSharedObjects(void) {
1343     int j;
1344 
1345     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1346     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1347     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1348     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1349     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1350     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1351     shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1352     shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1353     shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1354     shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1355     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1356     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1357     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1358     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1359         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1360     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1361         "-ERR no such key\r\n"));
1362     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1363         "-ERR syntax error\r\n"));
1364     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1365         "-ERR source and destination objects are the same\r\n"));
1366     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1367         "-ERR index out of range\r\n"));
1368     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1369         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1370     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1371         "-LOADING Redis is loading the dataset in memory\r\n"));
1372     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1373         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1374     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1375         "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
1376     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1377         "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
1378     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1379         "-READONLY You can't write against a read only slave.\r\n"));
1380     shared.noautherr = createObject(OBJ_STRING,sdsnew(
1381         "-NOAUTH Authentication required.\r\n"));
1382     shared.oomerr = createObject(OBJ_STRING,sdsnew(
1383         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1384     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1385         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1386     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1387         "-NOREPLICAS Not enough good slaves to write.\r\n"));
1388     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1389         "-BUSYKEY Target key name already exists.\r\n"));
1390     shared.space = createObject(OBJ_STRING,sdsnew(" "));
1391     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1392     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1393 
1394     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1395         char dictid_str[64];
1396         int dictid_len;
1397 
1398         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1399         shared.select[j] = createObject(OBJ_STRING,
1400             sdscatprintf(sdsempty(),
1401                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1402                 dictid_len, dictid_str));
1403     }
1404     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1405     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1406     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1407     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1408     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1409     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1410     shared.del = createStringObject("DEL",3);
1411     shared.rpop = createStringObject("RPOP",4);
1412     shared.lpop = createStringObject("LPOP",4);
1413     shared.lpush = createStringObject("LPUSH",5);
1414     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1415         shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j);
1416         shared.integers[j]->encoding = OBJ_ENCODING_INT;
1417     }
1418     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1419         shared.mbulkhdr[j] = createObject(OBJ_STRING,
1420             sdscatprintf(sdsempty(),"*%d\r\n",j));
1421         shared.bulkhdr[j] = createObject(OBJ_STRING,
1422             sdscatprintf(sdsempty(),"$%d\r\n",j));
1423     }
1424     /* The following two shared objects, minstring and maxstrings, are not
1425      * actually used for their value but as a special object meaning
1426      * respectively the minimum possible string and the maximum possible
1427      * string in string comparisons for the ZRANGEBYLEX command. */
1428     shared.minstring = createStringObject("minstring",9);
1429     shared.maxstring = createStringObject("maxstring",9);
1430 }
1431 
1432 void initServerConfig(void) {
1433     int j;
1434 
1435     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1436     server.configfile = NULL;
1437     server.executable = NULL;
1438     server.hz = CONFIG_DEFAULT_HZ;
1439     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1440     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1441     server.port = CONFIG_DEFAULT_SERVER_PORT;
1442     server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1443     server.bindaddr_count = 0;
1444     server.unixsocket = NULL;
1445     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1446     server.ipfd_count = 0;
1447     server.sofd = -1;
1448     server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
1449     server.dbnum = CONFIG_DEFAULT_DBNUM;
1450     server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1451     server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1452     server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1453     server.active_expire_enabled = 1;
1454     server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1455     server.saveparams = NULL;
1456     server.loading = 0;
1457     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1458     server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1459     server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1460     server.syslog_facility = LOG_LOCAL0;
1461     server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1462     server.supervised = 0;
1463     server.supervised_mode = SUPERVISED_NONE;
1464     server.aof_state = AOF_OFF;
1465     server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1466     server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1467     server.aof_rewrite_perc = AOF_REWRITE_PERC;
1468     server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1469     server.aof_rewrite_base_size = 0;
1470     server.aof_rewrite_scheduled = 0;
1471     server.aof_last_fsync = time(NULL);
1472     server.aof_rewrite_time_last = -1;
1473     server.aof_rewrite_time_start = -1;
1474     server.aof_lastbgrewrite_status = C_OK;
1475     server.aof_delayed_fsync = 0;
1476     server.aof_fd = -1;
1477     server.aof_selected_db = -1; /* Make sure the first time will not match */
1478     server.aof_flush_postponed_start = 0;
1479     server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1480     server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1481     server.pidfile = NULL;
1482     server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1483     server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1484     server.requirepass = NULL;
1485     server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1486     server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1487     server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1488     server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1489     server.notify_keyspace_events = 0;
1490     server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1491     server.bpop_blocked_clients = 0;
1492     server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1493     server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1494     server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1495     server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1496     server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1497     server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1498     server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1499     server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1500     server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1501     server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1502     server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1503     server.shutdown_asap = 0;
1504     server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1505     server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1506     server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1507     server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1508     server.cluster_enabled = 0;
1509     server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1510     server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1511     server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1512     server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1513     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1514     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1515     server.next_client_id = 1; /* Client IDs, start from 1 .*/
1516     server.loading_process_events_interval_bytes = (1024*1024*2);
1517 
1518     server.lruclock = getLRUClock();
1519     resetServerSaveParams();
1520 
1521     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
1522     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
1523     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1524     /* Replication related */
1525     server.masterauth = NULL;
1526     server.masterhost = NULL;
1527     server.masterport = 6379;
1528     server.master = NULL;
1529     server.cached_master = NULL;
1530     server.repl_master_initial_offset = -1;
1531     server.repl_state = REPL_STATE_NONE;
1532     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1533     server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1534     server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1535     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1536     server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1537     server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1538     server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1539     server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1540     server.master_repl_offset = 0;
1541 
1542     /* Replication partial resync backlog */
1543     server.repl_backlog = NULL;
1544     server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1545     server.repl_backlog_histlen = 0;
1546     server.repl_backlog_idx = 0;
1547     server.repl_backlog_off = 0;
1548     server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1549     server.repl_no_slaves_since = time(NULL);
1550 
1551     /* Client output buffer limits */
1552     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1553         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1554 
1555     /* Double constants initialization */
1556     R_Zero = 0.0;
1557     R_PosInf = 1.0/R_Zero;
1558     R_NegInf = -1.0/R_Zero;
1559     R_Nan = R_Zero/R_Zero;
1560 
1561     /* Command table -- we initiialize it here as it is part of the
1562      * initial configuration, since command names may be changed via
1563      * redis.conf using the rename-command directive. */
1564     server.commands = dictCreate(&commandTableDictType,NULL);
1565     server.orig_commands = dictCreate(&commandTableDictType,NULL);
1566     populateCommandTable();
1567     server.delCommand = lookupCommandByCString("del");
1568     server.multiCommand = lookupCommandByCString("multi");
1569     server.lpushCommand = lookupCommandByCString("lpush");
1570     server.lpopCommand = lookupCommandByCString("lpop");
1571     server.rpopCommand = lookupCommandByCString("rpop");
1572     server.sremCommand = lookupCommandByCString("srem");
1573     server.execCommand = lookupCommandByCString("exec");
1574 
1575     /* Slow log */
1576     server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1577     server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1578 
1579     /* Latency monitor */
1580     server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1581 
1582     /* Debugging */
1583     server.assert_failed = "<no assertion failed>";
1584     server.assert_file = "<no file>";
1585     server.assert_line = 0;
1586     server.bug_report_start = 0;
1587     server.watchdog_period = 0;
1588 }
1589 
1590 extern char **environ;
1591 
1592 /* Restart the server, executing the same executable that started this
1593  * instance, with the same arguments and configuration file.
1594  *
1595  * The function is designed to directly call execve() so that the new
1596  * server instance will retain the PID of the previous one.
1597  *
1598  * The list of flags, that may be bitwise ORed together, alter the
1599  * behavior of this function:
1600  *
1601  * RESTART_SERVER_NONE              No flags.
1602  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
1603  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
1604  *
1605  * On success the function does not return, because the process turns into
1606  * a different process. On error C_ERR is returned. */
1607 int restartServer(int flags, mstime_t delay) {
1608     int j;
1609 
1610     /* Check if we still have accesses to the executable that started this
1611      * server instance. */
1612     if (access(server.executable,X_OK) == -1) return C_ERR;
1613 
1614     /* Config rewriting. */
1615     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1616         server.configfile &&
1617         rewriteConfig(server.configfile) == -1) return C_ERR;
1618 
1619     /* Perform a proper shutdown. */
1620     if (flags & RESTART_SERVER_GRACEFULLY &&
1621         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR;
1622 
1623     /* Close all file descriptors, with the exception of stdin, stdout, strerr
1624      * which are useful if we restart a Redis server which is not daemonized. */
1625     for (j = 3; j < (int)server.maxclients + 1024; j++) close(j);
1626 
1627     /* Execute the server with the original command line. */
1628     if (delay) usleep(delay*1000);
1629     execve(server.executable,server.exec_argv,environ);
1630 
1631     /* If an error occurred here, there is nothing we can do, but exit. */
1632     _exit(1);
1633 
1634     return C_ERR; /* Never reached. */
1635 }
1636 
1637 /* This function will try to raise the max number of open files accordingly to
1638  * the configured max number of clients. It also reserves a number of file
1639  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1640  * persistence, listening sockets, log files and so forth.
1641  *
1642  * If it will not be possible to set the limit accordingly to the configured
1643  * max number of clients, the function will do the reverse setting
1644  * server.maxclients to the value that we can actually handle. */
1645 void adjustOpenFilesLimit(void) {
1646     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1647     struct rlimit limit;
1648 
1649     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1650         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1651             strerror(errno));
1652         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1653     } else {
1654         rlim_t oldlimit = limit.rlim_cur;
1655 
1656         /* Set the max number of files if the current limit is not enough
1657          * for our needs. */
1658         if (oldlimit < maxfiles) {
1659             rlim_t bestlimit;
1660             int setrlimit_error = 0;
1661 
1662             /* Try to set the file limit to match 'maxfiles' or at least
1663              * to the higher value supported less than maxfiles. */
1664             bestlimit = maxfiles;
1665             while(bestlimit > oldlimit) {
1666                 rlim_t decr_step = 16;
1667 
1668                 limit.rlim_cur = bestlimit;
1669                 limit.rlim_max = bestlimit;
1670                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1671                 setrlimit_error = errno;
1672 
1673                 /* We failed to set file limit to 'bestlimit'. Try with a
1674                  * smaller limit decrementing by a few FDs per iteration. */
1675                 if (bestlimit < decr_step) break;
1676                 bestlimit -= decr_step;
1677             }
1678 
1679             /* Assume that the limit we get initially is still valid if
1680              * our last try was even lower. */
1681             if (bestlimit < oldlimit) bestlimit = oldlimit;
1682 
1683             if (bestlimit < maxfiles) {
1684                 int old_maxclients = server.maxclients;
1685                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1686                 if (server.maxclients < 1) {
1687                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
1688                         "of %llu is not enough for the server to start. "
1689                         "Please increase your open file limit to at least "
1690                         "%llu. Exiting.",
1691                         (unsigned long long) oldlimit,
1692                         (unsigned long long) maxfiles);
1693                     exit(1);
1694                 }
1695                 serverLog(LL_WARNING,"You requested maxclients of %d "
1696                     "requiring at least %llu max file descriptors.",
1697                     old_maxclients,
1698                     (unsigned long long) maxfiles);
1699                 serverLog(LL_WARNING,"Server can't set maximum open files "
1700                     "to %llu because of OS error: %s.",
1701                     (unsigned long long) maxfiles, strerror(setrlimit_error));
1702                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1703                     "maxclients has been reduced to %d to compensate for "
1704                     "low ulimit. "
1705                     "If you need higher maxclients increase 'ulimit -n'.",
1706                     (unsigned long long) bestlimit, server.maxclients);
1707             } else {
1708                 serverLog(LL_NOTICE,"Increased maximum number of open files "
1709                     "to %llu (it was originally set to %llu).",
1710                     (unsigned long long) maxfiles,
1711                     (unsigned long long) oldlimit);
1712             }
1713         }
1714     }
1715 }
1716 
1717 /* Check that server.tcp_backlog can be actually enforced in Linux according
1718  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
1719 void checkTcpBacklogSettings(void) {
1720 #ifdef HAVE_PROC_SOMAXCONN
1721     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1722     char buf[1024];
1723     if (!fp) return;
1724     if (fgets(buf,sizeof(buf),fp) != NULL) {
1725         int somaxconn = atoi(buf);
1726         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1727             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1728         }
1729     }
1730     fclose(fp);
1731 #endif
1732 }
1733 
1734 /* Initialize a set of file descriptors to listen to the specified 'port'
1735  * binding the addresses specified in the Redis server configuration.
1736  *
1737  * The listening file descriptors are stored in the integer array 'fds'
1738  * and their number is set in '*count'.
1739  *
1740  * The addresses to bind are specified in the global server.bindaddr array
1741  * and their number is server.bindaddr_count. If the server configuration
1742  * contains no specific addresses to bind, this function will try to
1743  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1744  *
1745  * On success the function returns C_OK.
1746  *
1747  * On error the function returns C_ERR. For the function to be on
1748  * error, at least one of the server.bindaddr addresses was
1749  * impossible to bind, or no bind addresses were specified in the server
1750  * configuration but the function is not able to bind * for at least
1751  * one of the IPv4 or IPv6 protocols. */
1752 int listenToPort(int port, int *fds, int *count) {
1753     int j;
1754 
1755     /* Force binding of 0.0.0.0 if no bind address is specified, always
1756      * entering the loop if j == 0. */
1757     if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1758     for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1759         if (server.bindaddr[j] == NULL) {
1760             /* Bind * for both IPv6 and IPv4, we enter here only if
1761              * server.bindaddr_count == 0. */
1762             fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1763                 server.tcp_backlog);
1764             if (fds[*count] != ANET_ERR) {
1765                 anetNonBlock(NULL,fds[*count]);
1766                 (*count)++;
1767             }
1768             fds[*count] = anetTcpServer(server.neterr,port,NULL,
1769                 server.tcp_backlog);
1770             if (fds[*count] != ANET_ERR) {
1771                 anetNonBlock(NULL,fds[*count]);
1772                 (*count)++;
1773             }
1774             /* Exit the loop if we were able to bind * on IPv4 or IPv6,
1775              * otherwise fds[*count] will be ANET_ERR and we'll print an
1776              * error and return to the caller with an error. */
1777             if (*count) break;
1778         } else if (strchr(server.bindaddr[j],':')) {
1779             /* Bind IPv6 address. */
1780             fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1781                 server.tcp_backlog);
1782         } else {
1783             /* Bind IPv4 address. */
1784             fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1785                 server.tcp_backlog);
1786         }
1787         if (fds[*count] == ANET_ERR) {
1788             serverLog(LL_WARNING,
1789                 "Creating Server TCP listening socket %s:%d: %s",
1790                 server.bindaddr[j] ? server.bindaddr[j] : "*",
1791                 port, server.neterr);
1792             return C_ERR;
1793         }
1794         anetNonBlock(NULL,fds[*count]);
1795         (*count)++;
1796     }
1797     return C_OK;
1798 }
1799 
1800 /* Resets the stats that we expose via INFO or other means that we want
1801  * to reset via CONFIG RESETSTAT. The function is also used in order to
1802  * initialize these fields in initServer() at server startup. */
1803 void resetServerStats(void) {
1804     int j;
1805 
1806     server.stat_numcommands = 0;
1807     server.stat_numconnections = 0;
1808     server.stat_expiredkeys = 0;
1809     server.stat_evictedkeys = 0;
1810     server.stat_keyspace_misses = 0;
1811     server.stat_keyspace_hits = 0;
1812     server.stat_fork_time = 0;
1813     server.stat_fork_rate = 0;
1814     server.stat_rejected_conn = 0;
1815     server.stat_sync_full = 0;
1816     server.stat_sync_partial_ok = 0;
1817     server.stat_sync_partial_err = 0;
1818     for (j = 0; j < STATS_METRIC_COUNT; j++) {
1819         server.inst_metric[j].idx = 0;
1820         server.inst_metric[j].last_sample_time = mstime();
1821         server.inst_metric[j].last_sample_count = 0;
1822         memset(server.inst_metric[j].samples,0,
1823             sizeof(server.inst_metric[j].samples));
1824     }
1825     server.stat_net_input_bytes = 0;
1826     server.stat_net_output_bytes = 0;
1827     server.aof_delayed_fsync = 0;
1828 }
1829 
1830 void initServer(void) {
1831     int j;
1832 
1833     signal(SIGHUP, SIG_IGN);
1834     signal(SIGPIPE, SIG_IGN);
1835     setupSignalHandlers();
1836 
1837     if (server.syslog_enabled) {
1838         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
1839             server.syslog_facility);
1840     }
1841 
1842     server.pid = getpid();
1843     server.current_client = NULL;
1844     server.clients = listCreate();
1845     server.clients_to_close = listCreate();
1846     server.slaves = listCreate();
1847     server.monitors = listCreate();
1848     server.clients_pending_write = listCreate();
1849     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
1850     server.unblocked_clients = listCreate();
1851     server.ready_keys = listCreate();
1852     server.clients_waiting_acks = listCreate();
1853     server.get_ack_from_slaves = 0;
1854     server.clients_paused = 0;
1855     server.system_memory_size = zmalloc_get_memory_size();
1856 
1857     createSharedObjects();
1858     adjustOpenFilesLimit();
1859     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
1860     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1861 
1862     /* Open the TCP listening socket for the user commands. */
1863     if (server.port != 0 &&
1864         listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
1865         exit(1);
1866 
1867     /* Open the listening Unix domain socket. */
1868     if (server.unixsocket != NULL) {
1869         unlink(server.unixsocket); /* don't care if this fails */
1870         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
1871             server.unixsocketperm, server.tcp_backlog);
1872         if (server.sofd == ANET_ERR) {
1873             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
1874             exit(1);
1875         }
1876         anetNonBlock(NULL,server.sofd);
1877     }
1878 
1879     /* Abort if there are no listening sockets at all. */
1880     if (server.ipfd_count == 0 && server.sofd < 0) {
1881         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
1882         exit(1);
1883     }
1884 
1885     /* Create the Redis databases, and initialize other internal state. */
1886     for (j = 0; j < server.dbnum; j++) {
1887         server.db[j].dict = dictCreate(&dbDictType,NULL);
1888         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
1889         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
1890         server.db[j].ready_keys = dictCreate(&setDictType,NULL);
1891         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
1892         server.db[j].eviction_pool = evictionPoolAlloc();
1893         server.db[j].id = j;
1894         server.db[j].avg_ttl = 0;
1895     }
1896     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
1897     server.pubsub_patterns = listCreate();
1898     listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
1899     listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
1900     server.cronloops = 0;
1901     server.rdb_child_pid = -1;
1902     server.aof_child_pid = -1;
1903     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1904     aofRewriteBufferReset();
1905     server.aof_buf = sdsempty();
1906     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
1907     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
1908     server.rdb_save_time_last = -1;
1909     server.rdb_save_time_start = -1;
1910     server.dirty = 0;
1911     resetServerStats();
1912     /* A few stats we don't want to reset: server startup time, and peak mem. */
1913     server.stat_starttime = time(NULL);
1914     server.stat_peak_memory = 0;
1915     server.resident_set_size = 0;
1916     server.lastbgsave_status = C_OK;
1917     server.aof_last_write_status = C_OK;
1918     server.aof_last_write_errno = 0;
1919     server.repl_good_slaves_count = 0;
1920     updateCachedTime();
1921 
1922     /* Create the serverCron() time event, that's our main way to process
1923      * background operations. */
1924     if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
1925         serverPanic("Can't create the serverCron time event.");
1926         exit(1);
1927     }
1928 
1929     /* Create an event handler for accepting new connections in TCP and Unix
1930      * domain sockets. */
1931     for (j = 0; j < server.ipfd_count; j++) {
1932         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
1933             acceptTcpHandler,NULL) == AE_ERR)
1934             {
1935                 serverPanic(
1936                     "Unrecoverable error creating server.ipfd file event.");
1937             }
1938     }
1939     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
1940         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
1941 
1942     /* Open the AOF file if needed. */
1943     if (server.aof_state == AOF_ON) {
1944         server.aof_fd = open(server.aof_filename,
1945                                O_WRONLY|O_APPEND|O_CREAT,0644);
1946         if (server.aof_fd == -1) {
1947             serverLog(LL_WARNING, "Can't open the append-only file: %s",
1948                 strerror(errno));
1949             exit(1);
1950         }
1951     }
1952 
1953     /* 32 bit instances are limited to 4GB of address space, so if there is
1954      * no explicit limit in the user provided configuration we set a limit
1955      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
1956      * useless crashes of the Redis instance for out of memory. */
1957     if (server.arch_bits == 32 && server.maxmemory == 0) {
1958         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
1959         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
1960         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
1961     }
1962 
1963     if (server.cluster_enabled) clusterInit();
1964     replicationScriptCacheInit();
1965     scriptingInit(1);
1966     slowlogInit();
1967     latencyMonitorInit();
1968     bioInit();
1969 }
1970 
1971 /* Populates the Redis Command Table starting from the hard coded list
1972  * we have on top of redis.c file. */
1973 void populateCommandTable(void) {
1974     int j;
1975     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
1976 
1977     for (j = 0; j < numcommands; j++) {
1978         struct redisCommand *c = redisCommandTable+j;
1979         char *f = c->sflags;
1980         int retval1, retval2;
1981 
1982         while(*f != '\0') {
1983             switch(*f) {
1984             case 'w': c->flags |= CMD_WRITE; break;
1985             case 'r': c->flags |= CMD_READONLY; break;
1986             case 'm': c->flags |= CMD_DENYOOM; break;
1987             case 'a': c->flags |= CMD_ADMIN; break;
1988             case 'p': c->flags |= CMD_PUBSUB; break;
1989             case 's': c->flags |= CMD_NOSCRIPT; break;
1990             case 'R': c->flags |= CMD_RANDOM; break;
1991             case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
1992             case 'l': c->flags |= CMD_LOADING; break;
1993             case 't': c->flags |= CMD_STALE; break;
1994             case 'M': c->flags |= CMD_SKIP_MONITOR; break;
1995             case 'k': c->flags |= CMD_ASKING; break;
1996             case 'F': c->flags |= CMD_FAST; break;
1997             default: serverPanic("Unsupported command flag"); break;
1998             }
1999             f++;
2000         }
2001 
2002         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
2003         /* Populate an additional dictionary that will be unaffected
2004          * by rename-command statements in redis.conf. */
2005         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2006         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2007     }
2008 }
2009 
2010 void resetCommandTableStats(void) {
2011     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2012     int j;
2013 
2014     for (j = 0; j < numcommands; j++) {
2015         struct redisCommand *c = redisCommandTable+j;
2016 
2017         c->microseconds = 0;
2018         c->calls = 0;
2019     }
2020 }
2021 
2022 /* ========================== Redis OP Array API ============================ */
2023 
2024 void redisOpArrayInit(redisOpArray *oa) {
2025     oa->ops = NULL;
2026     oa->numops = 0;
2027 }
2028 
2029 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2030                        robj **argv, int argc, int target)
2031 {
2032     redisOp *op;
2033 
2034     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2035     op = oa->ops+oa->numops;
2036     op->cmd = cmd;
2037     op->dbid = dbid;
2038     op->argv = argv;
2039     op->argc = argc;
2040     op->target = target;
2041     oa->numops++;
2042     return oa->numops;
2043 }
2044 
2045 void redisOpArrayFree(redisOpArray *oa) {
2046     while(oa->numops) {
2047         int j;
2048         redisOp *op;
2049 
2050         oa->numops--;
2051         op = oa->ops+oa->numops;
2052         for (j = 0; j < op->argc; j++)
2053             decrRefCount(op->argv[j]);
2054         zfree(op->argv);
2055     }
2056     zfree(oa->ops);
2057 }
2058 
2059 /* ====================== Commands lookup and execution ===================== */
2060 
2061 struct redisCommand *lookupCommand(sds name) {
2062     return dictFetchValue(server.commands, name);
2063 }
2064 
2065 struct redisCommand *lookupCommandByCString(char *s) {
2066     struct redisCommand *cmd;
2067     sds name = sdsnew(s);
2068 
2069     cmd = dictFetchValue(server.commands, name);
2070     sdsfree(name);
2071     return cmd;
2072 }
2073 
2074 /* Lookup the command in the current table, if not found also check in
2075  * the original table containing the original command names unaffected by
2076  * redis.conf rename-command statement.
2077  *
2078  * This is used by functions rewriting the argument vector such as
2079  * rewriteClientCommandVector() in order to set client->cmd pointer
2080  * correctly even if the command was renamed. */
2081 struct redisCommand *lookupCommandOrOriginal(sds name) {
2082     struct redisCommand *cmd = dictFetchValue(server.commands, name);
2083 
2084     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2085     return cmd;
2086 }
2087 
2088 /* Propagate the specified command (in the context of the specified database id)
2089  * to AOF and Slaves.
2090  *
2091  * flags are an xor between:
2092  * + PROPAGATE_NONE (no propagation of command at all)
2093  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2094  * + PROPAGATE_REPL (propagate into the replication link)
2095  *
2096  * This should not be used inside commands implementation. Use instead
2097  * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2098  */
2099 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2100                int flags)
2101 {
2102     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2103         feedAppendOnlyFile(cmd,dbid,argv,argc);
2104     if (flags & PROPAGATE_REPL)
2105         replicationFeedSlaves(server.slaves,dbid,argv,argc);
2106 }
2107 
2108 /* Used inside commands to schedule the propagation of additional commands
2109  * after the current command is propagated to AOF / Replication.
2110  *
2111  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2112  * database ID the command should be propagated into.
2113  * Arguments of the command to propagte are passed as an array of redis
2114  * objects pointers of len 'argc', using the 'argv' vector.
2115  *
2116  * The function does not take a reference to the passed 'argv' vector,
2117  * so it is up to the caller to release the passed argv (but it is usually
2118  * stack allocated).  The function autoamtically increments ref count of
2119  * passed objects, so the caller does not need to. */
2120 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2121                    int target)
2122 {
2123     robj **argvcopy;
2124     int j;
2125 
2126     if (server.loading) return; /* No propagation during loading. */
2127 
2128     argvcopy = zmalloc(sizeof(robj*)*argc);
2129     for (j = 0; j < argc; j++) {
2130         argvcopy[j] = argv[j];
2131         incrRefCount(argv[j]);
2132     }
2133     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2134 }
2135 
2136 /* It is possible to call the function forceCommandPropagation() inside a
2137  * Redis command implementation in order to to force the propagation of a
2138  * specific command execution into AOF / Replication. */
2139 void forceCommandPropagation(client *c, int flags) {
2140     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2141     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2142 }
2143 
2144 /* Avoid that the executed command is propagated at all. This way we
2145  * are free to just propagate what we want using the alsoPropagate()
2146  * API. */
2147 void preventCommandPropagation(client *c) {
2148     c->flags |= CLIENT_PREVENT_PROP;
2149 }
2150 
2151 /* AOF specific version of preventCommandPropagation(). */
2152 void preventCommandAOF(client *c) {
2153     c->flags |= CLIENT_PREVENT_AOF_PROP;
2154 }
2155 
2156 /* Replication specific version of preventCommandPropagation(). */
2157 void preventCommandReplication(client *c) {
2158     c->flags |= CLIENT_PREVENT_REPL_PROP;
2159 }
2160 
2161 /* Call() is the core of Redis execution of a command.
2162  *
2163  * The following flags can be passed:
2164  * CMD_CALL_NONE        No flags.
2165  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
2166  * CMD_CALL_STATS       Populate command stats.
2167  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
2168  *                          or if the client flags are forcing propagation.
2169  * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
2170  *                          or if the client flags are forcing propagation.
2171  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2172  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
2173  *
2174  * The exact propagation behavior depends on the client flags.
2175  * Specifically:
2176  *
2177  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2178  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2179  *    in the call flags, then the command is propagated even if the
2180  *    dataset was not affected by the command.
2181  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2182  *    are set, the propagation into AOF or to slaves is not performed even
2183  *    if the command modified the dataset.
2184  *
2185  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2186  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2187  * slaves propagation will never occur.
2188  *
2189  * Client flags are modified by the implementation of a given command
2190  * using the following API:
2191  *
2192  * forceCommandPropagation(client *c, int flags);
2193  * preventCommandPropagation(client *c);
2194  * preventCommandAOF(client *c);
2195  * preventCommandReplication(client *c);
2196  *
2197  */
2198 void call(client *c, int flags) {
2199     long long dirty, start, duration;
2200     int client_old_flags = c->flags;
2201 
2202     /* Sent the command to clients in MONITOR mode, only if the commands are
2203      * not generated from reading an AOF. */
2204     if (listLength(server.monitors) &&
2205         !server.loading &&
2206         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2207     {
2208         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2209     }
2210 
2211     /* Initialization: clear the flags that must be set by the command on
2212      * demand, and initialize the array for additional commands propagation. */
2213     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2214     redisOpArrayInit(&server.also_propagate);
2215 
2216     /* Call the command. */
2217     dirty = server.dirty;
2218     start = ustime();
2219     c->cmd->proc(c);
2220     duration = ustime()-start;
2221     dirty = server.dirty-dirty;
2222     if (dirty < 0) dirty = 0;
2223 
2224     /* When EVAL is called loading the AOF we don't want commands called
2225      * from Lua to go into the slowlog or to populate statistics. */
2226     if (server.loading && c->flags & CLIENT_LUA)
2227         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2228 
2229     /* If the caller is Lua, we want to force the EVAL caller to propagate
2230      * the script if the command flag or client flag are forcing the
2231      * propagation. */
2232     if (c->flags & CLIENT_LUA && server.lua_caller) {
2233         if (c->flags & CLIENT_FORCE_REPL)
2234             server.lua_caller->flags |= CLIENT_FORCE_REPL;
2235         if (c->flags & CLIENT_FORCE_AOF)
2236             server.lua_caller->flags |= CLIENT_FORCE_AOF;
2237     }
2238 
2239     /* Log the command into the Slow log if needed, and populate the
2240      * per-command statistics that we show in INFO commandstats. */
2241     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2242         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2243                               "fast-command" : "command";
2244         latencyAddSampleIfNeeded(latency_event,duration/1000);
2245         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
2246     }
2247     if (flags & CMD_CALL_STATS) {
2248         c->lastcmd->microseconds += duration;
2249         c->lastcmd->calls++;
2250     }
2251 
2252     /* Propagate the command into the AOF and replication link */
2253     if (flags & CMD_CALL_PROPAGATE &&
2254         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2255     {
2256         int propagate_flags = PROPAGATE_NONE;
2257 
2258         /* Check if the command operated changes in the data set. If so
2259          * set for replication / AOF propagation. */
2260         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2261 
2262         /* If the client forced AOF / replication of the command, set
2263          * the flags regardless of the command effects on the data set. */
2264         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2265         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2266 
2267         /* However prevent AOF / replication propagation if the command
2268          * implementatino called preventCommandPropagation() or similar,
2269          * or if we don't have the call() flags to do so. */
2270         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2271             !(flags & CMD_CALL_PROPAGATE_REPL))
2272                 propagate_flags &= ~PROPAGATE_REPL;
2273         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2274             !(flags & CMD_CALL_PROPAGATE_AOF))
2275                 propagate_flags &= ~PROPAGATE_AOF;
2276 
2277         /* Call propagate() only if at least one of AOF / replication
2278          * propagation is needed. */
2279         if (propagate_flags != PROPAGATE_NONE)
2280             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2281     }
2282 
2283     /* Restore the old replication flags, since call() can be executed
2284      * recursively. */
2285     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2286     c->flags |= client_old_flags &
2287         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2288 
2289     /* Handle the alsoPropagate() API to handle commands that want to propagate
2290      * multiple separated commands. Note that alsoPropagate() is not affected
2291      * by CLIENT_PREVENT_PROP flag. */
2292     if (server.also_propagate.numops) {
2293         int j;
2294         redisOp *rop;
2295 
2296         if (flags & CMD_CALL_PROPAGATE) {
2297             for (j = 0; j < server.also_propagate.numops; j++) {
2298                 rop = &server.also_propagate.ops[j];
2299                 int target = rop->target;
2300                 /* Whatever the command wish is, we honor the call() flags. */
2301                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2302                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2303                 if (target)
2304                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2305             }
2306         }
2307         redisOpArrayFree(&server.also_propagate);
2308     }
2309     server.stat_numcommands++;
2310 }
2311 
2312 /* If this function gets called we already read a whole
2313  * command, arguments are in the client argv/argc fields.
2314  * processCommand() execute the command or prepare the
2315  * server for a bulk read from the client.
2316  *
2317  * If C_OK is returned the client is still alive and valid and
2318  * other operations can be performed by the caller. Otherwise
2319  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
2320 int processCommand(client *c) {
2321     /* The QUIT command is handled separately. Normal command procs will
2322      * go through checking for replication and QUIT will cause trouble
2323      * when FORCE_REPLICATION is enabled and would be implemented in
2324      * a regular command proc. */
2325     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2326         addReply(c,shared.ok);
2327         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2328         return C_ERR;
2329     }
2330 
2331     /* Now lookup the command and check ASAP about trivial error conditions
2332      * such as wrong arity, bad command name and so forth. */
2333     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2334     if (!c->cmd) {
2335         flagTransaction(c);
2336         addReplyErrorFormat(c,"unknown command '%s'",
2337             (char*)c->argv[0]->ptr);
2338         return C_OK;
2339     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2340                (c->argc < -c->cmd->arity)) {
2341         flagTransaction(c);
2342         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2343             c->cmd->name);
2344         return C_OK;
2345     }
2346 
2347     /* Check if the user is authenticated */
2348     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2349     {
2350         flagTransaction(c);
2351         addReply(c,shared.noautherr);
2352         return C_OK;
2353     }
2354 
2355     /* If cluster is enabled perform the cluster redirection here.
2356      * However we don't perform the redirection if:
2357      * 1) The sender of this command is our master.
2358      * 2) The command has no key arguments. */
2359     if (server.cluster_enabled &&
2360         !(c->flags & CLIENT_MASTER) &&
2361         !(c->flags & CLIENT_LUA &&
2362           server.lua_caller->flags & CLIENT_MASTER) &&
2363         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
2364     {
2365         int hashslot;
2366 
2367         if (server.cluster->state != CLUSTER_OK) {
2368             flagTransaction(c);
2369             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
2370             return C_OK;
2371         } else {
2372             int error_code;
2373             clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
2374             if (n == NULL || n != server.cluster->myself) {
2375                 flagTransaction(c);
2376                 clusterRedirectClient(c,n,hashslot,error_code);
2377                 return C_OK;
2378             }
2379         }
2380     }
2381 
2382     /* Handle the maxmemory directive.
2383      *
2384      * First we try to free some memory if possible (if there are volatile
2385      * keys in the dataset). If there are not the only thing we can do
2386      * is returning an error. */
2387     if (server.maxmemory) {
2388         int retval = freeMemoryIfNeeded();
2389         /* freeMemoryIfNeeded may flush slave output buffers. This may result
2390          * into a slave, that may be the active client, to be freed. */
2391         if (server.current_client == NULL) return C_ERR;
2392 
2393         /* It was impossible to free enough memory, and the command the client
2394          * is trying to execute is denied during OOM conditions? Error. */
2395         if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
2396             flagTransaction(c);
2397             addReply(c, shared.oomerr);
2398             return C_OK;
2399         }
2400     }
2401 
2402     /* Don't accept write commands if there are problems persisting on disk
2403      * and if this is a master instance. */
2404     if (((server.stop_writes_on_bgsave_err &&
2405           server.saveparamslen > 0 &&
2406           server.lastbgsave_status == C_ERR) ||
2407           server.aof_last_write_status == C_ERR) &&
2408         server.masterhost == NULL &&
2409         (c->cmd->flags & CMD_WRITE ||
2410          c->cmd->proc == pingCommand))
2411     {
2412         flagTransaction(c);
2413         if (server.aof_last_write_status == C_OK)
2414             addReply(c, shared.bgsaveerr);
2415         else
2416             addReplySds(c,
2417                 sdscatprintf(sdsempty(),
2418                 "-MISCONF Errors writing to the AOF file: %s\r\n",
2419                 strerror(server.aof_last_write_errno)));
2420         return C_OK;
2421     }
2422 
2423     /* Don't accept write commands if there are not enough good slaves and
2424      * user configured the min-slaves-to-write option. */
2425     if (server.masterhost == NULL &&
2426         server.repl_min_slaves_to_write &&
2427         server.repl_min_slaves_max_lag &&
2428         c->cmd->flags & CMD_WRITE &&
2429         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2430     {
2431         flagTransaction(c);
2432         addReply(c, shared.noreplicaserr);
2433         return C_OK;
2434     }
2435 
2436     /* Don't accept write commands if this is a read only slave. But
2437      * accept write commands if this is our master. */
2438     if (server.masterhost && server.repl_slave_ro &&
2439         !(c->flags & CLIENT_MASTER) &&
2440         c->cmd->flags & CMD_WRITE)
2441     {
2442         addReply(c, shared.roslaveerr);
2443         return C_OK;
2444     }
2445 
2446     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2447     if (c->flags & CLIENT_PUBSUB &&
2448         c->cmd->proc != pingCommand &&
2449         c->cmd->proc != subscribeCommand &&
2450         c->cmd->proc != unsubscribeCommand &&
2451         c->cmd->proc != psubscribeCommand &&
2452         c->cmd->proc != punsubscribeCommand) {
2453         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2454         return C_OK;
2455     }
2456 
2457     /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
2458      * we are a slave with a broken link with master. */
2459     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2460         server.repl_serve_stale_data == 0 &&
2461         !(c->cmd->flags & CMD_STALE))
2462     {
2463         flagTransaction(c);
2464         addReply(c, shared.masterdownerr);
2465         return C_OK;
2466     }
2467 
2468     /* Loading DB? Return an error if the command has not the
2469      * CMD_LOADING flag. */
2470     if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2471         addReply(c, shared.loadingerr);
2472         return C_OK;
2473     }
2474 
2475     /* Lua script too slow? Only allow a limited number of commands. */
2476     if (server.lua_timedout &&
2477           c->cmd->proc != authCommand &&
2478           c->cmd->proc != replconfCommand &&
2479         !(c->cmd->proc == shutdownCommand &&
2480           c->argc == 2 &&
2481           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2482         !(c->cmd->proc == scriptCommand &&
2483           c->argc == 2 &&
2484           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2485     {
2486         flagTransaction(c);
2487         addReply(c, shared.slowscripterr);
2488         return C_OK;
2489     }
2490 
2491     /* Exec the command */
2492     if (c->flags & CLIENT_MULTI &&
2493         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2494         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2495     {
2496         queueMultiCommand(c);
2497         addReply(c,shared.queued);
2498     } else {
2499         call(c,CMD_CALL_FULL);
2500         c->woff = server.master_repl_offset;
2501         if (listLength(server.ready_keys))
2502             handleClientsBlockedOnLists();
2503     }
2504     return C_OK;
2505 }
2506 
2507 /*================================== Shutdown =============================== */
2508 
2509 /* Close listening sockets. Also unlink the unix domain socket if
2510  * unlink_unix_socket is non-zero. */
2511 void closeListeningSockets(int unlink_unix_socket) {
2512     int j;
2513 
2514     for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2515     if (server.sofd != -1) close(server.sofd);
2516     if (server.cluster_enabled)
2517         for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2518     if (unlink_unix_socket && server.unixsocket) {
2519         serverLog(LL_NOTICE,"Removing the unix socket file.");
2520         unlink(server.unixsocket); /* don't care if this fails */
2521     }
2522 }
2523 
2524 int prepareForShutdown(int flags) {
2525     int save = flags & SHUTDOWN_SAVE;
2526     int nosave = flags & SHUTDOWN_NOSAVE;
2527 
2528     serverLog(LL_WARNING,"User requested shutdown...");
2529 
2530     /* Kill all the Lua debugger forked sessions. */
2531     ldbKillForkedSessions();
2532 
2533     /* Kill the saving child if there is a background saving in progress.
2534        We want to avoid race conditions, for instance our saving child may
2535        overwrite the synchronous saving did by SHUTDOWN. */
2536     if (server.rdb_child_pid != -1) {
2537         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2538         kill(server.rdb_child_pid,SIGUSR1);
2539         rdbRemoveTempFile(server.rdb_child_pid);
2540     }
2541 
2542     if (server.aof_state != AOF_OFF) {
2543         /* Kill the AOF saving child as the AOF we already have may be longer
2544          * but contains the full dataset anyway. */
2545         if (server.aof_child_pid != -1) {
2546             /* If we have AOF enabled but haven't written the AOF yet, don't
2547              * shutdown or else the dataset will be lost. */
2548             if (server.aof_state == AOF_WAIT_REWRITE) {
2549                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2550                 return C_ERR;
2551             }
2552             serverLog(LL_WARNING,
2553                 "There is a child rewriting the AOF. Killing it!");
2554             kill(server.aof_child_pid,SIGUSR1);
2555         }
2556         /* Append only file: fsync() the AOF and exit */
2557         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2558         aof_fsync(server.aof_fd);
2559     }
2560 
2561     /* Create a new RDB file before exiting. */
2562     if ((server.saveparamslen > 0 && !nosave) || save) {
2563         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2564         /* Snapshotting. Perform a SYNC SAVE and exit */
2565         if (rdbSave(server.rdb_filename) != C_OK) {
2566             /* Ooops.. error saving! The best we can do is to continue
2567              * operating. Note that if there was a background saving process,
2568              * in the next cron() Redis will be notified that the background
2569              * saving aborted, handling special stuff like slaves pending for
2570              * synchronization... */
2571             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2572             return C_ERR;
2573         }
2574     }
2575 
2576     /* Remove the pid file if possible and needed. */
2577     if (server.daemonize || server.pidfile) {
2578         serverLog(LL_NOTICE,"Removing the pid file.");
2579         unlink(server.pidfile);
2580     }
2581 
2582     /* Best effort flush of slave output buffers, so that we hopefully
2583      * send them pending writes. */
2584     flushSlavesOutputBuffers();
2585 
2586     /* Close the listening sockets. Apparently this allows faster restarts. */
2587     closeListeningSockets(1);
2588     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2589         server.sentinel_mode ? "Sentinel" : "Redis");
2590     return C_OK;
2591 }
2592 
2593 /*================================== Commands =============================== */
2594 
2595 /* Return zero if strings are the same, non-zero if they are not.
2596  * The comparison is performed in a way that prevents an attacker to obtain
2597  * information about the nature of the strings just monitoring the execution
2598  * time of the function.
2599  *
2600  * Note that limiting the comparison length to strings up to 512 bytes we
2601  * can avoid leaking any information about the password length and any
2602  * possible branch misprediction related leak.
2603  */
2604 int time_independent_strcmp(char *a, char *b) {
2605     char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2606     /* The above two strlen perform len(a) + len(b) operations where either
2607      * a or b are fixed (our password) length, and the difference is only
2608      * relative to the length of the user provided string, so no information
2609      * leak is possible in the following two lines of code. */
2610     unsigned int alen = strlen(a);
2611     unsigned int blen = strlen(b);
2612     unsigned int j;
2613     int diff = 0;
2614 
2615     /* We can't compare strings longer than our static buffers.
2616      * Note that this will never pass the first test in practical circumstances
2617      * so there is no info leak. */
2618     if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2619 
2620     memset(bufa,0,sizeof(bufa));        /* Constant time. */
2621     memset(bufb,0,sizeof(bufb));        /* Constant time. */
2622     /* Again the time of the following two copies is proportional to
2623      * len(a) + len(b) so no info is leaked. */
2624     memcpy(bufa,a,alen);
2625     memcpy(bufb,b,blen);
2626 
2627     /* Always compare all the chars in the two buffers without
2628      * conditional expressions. */
2629     for (j = 0; j < sizeof(bufa); j++) {
2630         diff |= (bufa[j] ^ bufb[j]);
2631     }
2632     /* Length must be equal as well. */
2633     diff |= alen ^ blen;
2634     return diff; /* If zero strings are the same. */
2635 }
2636 
2637 void authCommand(client *c) {
2638     if (!server.requirepass) {
2639         addReplyError(c,"Client sent AUTH, but no password is set");
2640     } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2641       c->authenticated = 1;
2642       addReply(c,shared.ok);
2643     } else {
2644       c->authenticated = 0;
2645       addReplyError(c,"invalid password");
2646     }
2647 }
2648 
2649 /* The PING command. It works in a different way if the client is in
2650  * in Pub/Sub mode. */
2651 void pingCommand(client *c) {
2652     /* The command takes zero or one arguments. */
2653     if (c->argc > 2) {
2654         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2655             c->cmd->name);
2656         return;
2657     }
2658 
2659     if (c->flags & CLIENT_PUBSUB) {
2660         addReply(c,shared.mbulkhdr[2]);
2661         addReplyBulkCBuffer(c,"pong",4);
2662         if (c->argc == 1)
2663             addReplyBulkCBuffer(c,"",0);
2664         else
2665             addReplyBulk(c,c->argv[1]);
2666     } else {
2667         if (c->argc == 1)
2668             addReply(c,shared.pong);
2669         else
2670             addReplyBulk(c,c->argv[1]);
2671     }
2672 }
2673 
2674 void echoCommand(client *c) {
2675     addReplyBulk(c,c->argv[1]);
2676 }
2677 
2678 void timeCommand(client *c) {
2679     struct timeval tv;
2680 
2681     /* gettimeofday() can only fail if &tv is a bad address so we
2682      * don't check for errors. */
2683     gettimeofday(&tv,NULL);
2684     addReplyMultiBulkLen(c,2);
2685     addReplyBulkLongLong(c,tv.tv_sec);
2686     addReplyBulkLongLong(c,tv.tv_usec);
2687 }
2688 
2689 /* Helper function for addReplyCommand() to output flags. */
2690 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2691     if (cmd->flags & f) {
2692         addReplyStatus(c, reply);
2693         return 1;
2694     }
2695     return 0;
2696 }
2697 
2698 /* Output the representation of a Redis command. Used by the COMMAND command. */
2699 void addReplyCommand(client *c, struct redisCommand *cmd) {
2700     if (!cmd) {
2701         addReply(c, shared.nullbulk);
2702     } else {
2703         /* We are adding: command name, arg count, flags, first, last, offset */
2704         addReplyMultiBulkLen(c, 6);
2705         addReplyBulkCString(c, cmd->name);
2706         addReplyLongLong(c, cmd->arity);
2707 
2708         int flagcount = 0;
2709         void *flaglen = addDeferredMultiBulkLength(c);
2710         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
2711         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
2712         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
2713         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
2714         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
2715         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
2716         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
2717         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
2718         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
2719         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
2720         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
2721         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
2722         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
2723         if (cmd->getkeys_proc) {
2724             addReplyStatus(c, "movablekeys");
2725             flagcount += 1;
2726         }
2727         setDeferredMultiBulkLength(c, flaglen, flagcount);
2728 
2729         addReplyLongLong(c, cmd->firstkey);
2730         addReplyLongLong(c, cmd->lastkey);
2731         addReplyLongLong(c, cmd->keystep);
2732     }
2733 }
2734 
2735 /* COMMAND <subcommand> <args> */
2736 void commandCommand(client *c) {
2737     dictIterator *di;
2738     dictEntry *de;
2739 
2740     if (c->argc == 1) {
2741         addReplyMultiBulkLen(c, dictSize(server.commands));
2742         di = dictGetIterator(server.commands);
2743         while ((de = dictNext(di)) != NULL) {
2744             addReplyCommand(c, dictGetVal(de));
2745         }
2746         dictReleaseIterator(di);
2747     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
2748         int i;
2749         addReplyMultiBulkLen(c, c->argc-2);
2750         for (i = 2; i < c->argc; i++) {
2751             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
2752         }
2753     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
2754         addReplyLongLong(c, dictSize(server.commands));
2755     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
2756         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
2757         int *keys, numkeys, j;
2758 
2759         if (!cmd) {
2760             addReplyErrorFormat(c,"Invalid command specified");
2761             return;
2762         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
2763                    ((c->argc-2) < -cmd->arity))
2764         {
2765             addReplyError(c,"Invalid number of arguments specified for command");
2766             return;
2767         }
2768 
2769         keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
2770         addReplyMultiBulkLen(c,numkeys);
2771         for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
2772         getKeysFreeResult(keys);
2773     } else {
2774         addReplyError(c, "Unknown subcommand or wrong number of arguments.");
2775         return;
2776     }
2777 }
2778 
2779 /* Convert an amount of bytes into a human readable string in the form
2780  * of 100B, 2G, 100M, 4K, and so forth. */
2781 void bytesToHuman(char *s, unsigned long long n) {
2782     double d;
2783 
2784     if (n < 1024) {
2785         /* Bytes */
2786         sprintf(s,"%lluB",n);
2787         return;
2788     } else if (n < (1024*1024)) {
2789         d = (double)n/(1024);
2790         sprintf(s,"%.2fK",d);
2791     } else if (n < (1024LL*1024*1024)) {
2792         d = (double)n/(1024*1024);
2793         sprintf(s,"%.2fM",d);
2794     } else if (n < (1024LL*1024*1024*1024)) {
2795         d = (double)n/(1024LL*1024*1024);
2796         sprintf(s,"%.2fG",d);
2797     } else if (n < (1024LL*1024*1024*1024*1024)) {
2798         d = (double)n/(1024LL*1024*1024*1024);
2799         sprintf(s,"%.2fT",d);
2800     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
2801         d = (double)n/(1024LL*1024*1024*1024*1024);
2802         sprintf(s,"%.2fP",d);
2803     } else {
2804         /* Let's hope we never need this */
2805         sprintf(s,"%lluB",n);
2806     }
2807 }
2808 
2809 /* Create the string returned by the INFO command. This is decoupled
2810  * by the INFO command itself as we need to report the same information
2811  * on memory corruption problems. */
2812 sds genRedisInfoString(char *section) {
2813     sds info = sdsempty();
2814     time_t uptime = server.unixtime-server.stat_starttime;
2815     int j, numcommands;
2816     struct rusage self_ru, c_ru;
2817     unsigned long lol, bib;
2818     int allsections = 0, defsections = 0;
2819     int sections = 0;
2820 
2821     if (section == NULL) section = "default";
2822     allsections = strcasecmp(section,"all") == 0;
2823     defsections = strcasecmp(section,"default") == 0;
2824 
2825     getrusage(RUSAGE_SELF, &self_ru);
2826     getrusage(RUSAGE_CHILDREN, &c_ru);
2827     getClientsMaxBuffers(&lol,&bib);
2828 
2829     /* Server */
2830     if (allsections || defsections || !strcasecmp(section,"server")) {
2831         static int call_uname = 1;
2832         static struct utsname name;
2833         char *mode;
2834 
2835         if (server.cluster_enabled) mode = "cluster";
2836         else if (server.sentinel_mode) mode = "sentinel";
2837         else mode = "standalone";
2838 
2839         if (sections++) info = sdscat(info,"\r\n");
2840 
2841         if (call_uname) {
2842             /* Uname can be slow and is always the same output. Cache it. */
2843             uname(&name);
2844             call_uname = 0;
2845         }
2846 
2847         info = sdscatprintf(info,
2848             "# Server\r\n"
2849             "redis_version:%s\r\n"
2850             "redis_git_sha1:%s\r\n"
2851             "redis_git_dirty:%d\r\n"
2852             "redis_build_id:%llx\r\n"
2853             "redis_mode:%s\r\n"
2854             "os:%s %s %s\r\n"
2855             "arch_bits:%d\r\n"
2856             "multiplexing_api:%s\r\n"
2857             "gcc_version:%d.%d.%d\r\n"
2858             "process_id:%ld\r\n"
2859             "run_id:%s\r\n"
2860             "tcp_port:%d\r\n"
2861             "uptime_in_seconds:%jd\r\n"
2862             "uptime_in_days:%jd\r\n"
2863             "hz:%d\r\n"
2864             "lru_clock:%ld\r\n"
2865             "executable:%s\r\n"
2866             "config_file:%s\r\n",
2867             REDIS_VERSION,
2868             redisGitSHA1(),
2869             strtol(redisGitDirty(),NULL,10) > 0,
2870             (unsigned long long) redisBuildId(),
2871             mode,
2872             name.sysname, name.release, name.machine,
2873             server.arch_bits,
2874             aeGetApiName(),
2875 #ifdef __GNUC__
2876             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
2877 #else
2878             0,0,0,
2879 #endif
2880             (long) getpid(),
2881             server.runid,
2882             server.port,
2883             (intmax_t)uptime,
2884             (intmax_t)(uptime/(3600*24)),
2885             server.hz,
2886             (unsigned long) server.lruclock,
2887             server.executable ? server.executable : "",
2888             server.configfile ? server.configfile : "");
2889     }
2890 
2891     /* Clients */
2892     if (allsections || defsections || !strcasecmp(section,"clients")) {
2893         if (sections++) info = sdscat(info,"\r\n");
2894         info = sdscatprintf(info,
2895             "# Clients\r\n"
2896             "connected_clients:%lu\r\n"
2897             "client_longest_output_list:%lu\r\n"
2898             "client_biggest_input_buf:%lu\r\n"
2899             "blocked_clients:%d\r\n",
2900             listLength(server.clients)-listLength(server.slaves),
2901             lol, bib,
2902             server.bpop_blocked_clients);
2903     }
2904 
2905     /* Memory */
2906     if (allsections || defsections || !strcasecmp(section,"memory")) {
2907         char hmem[64];
2908         char peak_hmem[64];
2909         char total_system_hmem[64];
2910         char used_memory_lua_hmem[64];
2911         char used_memory_rss_hmem[64];
2912         char maxmemory_hmem[64];
2913         size_t zmalloc_used = zmalloc_used_memory();
2914         size_t total_system_mem = server.system_memory_size;
2915         const char *evict_policy = maxmemoryToString();
2916         long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
2917 
2918         /* Peak memory is updated from time to time by serverCron() so it
2919          * may happen that the instantaneous value is slightly bigger than
2920          * the peak value. This may confuse users, so we update the peak
2921          * if found smaller than the current memory usage. */
2922         if (zmalloc_used > server.stat_peak_memory)
2923             server.stat_peak_memory = zmalloc_used;
2924 
2925         bytesToHuman(hmem,zmalloc_used);
2926         bytesToHuman(peak_hmem,server.stat_peak_memory);
2927         bytesToHuman(total_system_hmem,total_system_mem);
2928         bytesToHuman(used_memory_lua_hmem,memory_lua);
2929         bytesToHuman(used_memory_rss_hmem,server.resident_set_size);
2930         bytesToHuman(maxmemory_hmem,server.maxmemory);
2931 
2932         if (sections++) info = sdscat(info,"\r\n");
2933         info = sdscatprintf(info,
2934             "# Memory\r\n"
2935             "used_memory:%zu\r\n"
2936             "used_memory_human:%s\r\n"
2937             "used_memory_rss:%zu\r\n"
2938             "used_memory_rss_human:%s\r\n"
2939             "used_memory_peak:%zu\r\n"
2940             "used_memory_peak_human:%s\r\n"
2941             "total_system_memory:%lu\r\n"
2942             "total_system_memory_human:%s\r\n"
2943             "used_memory_lua:%lld\r\n"
2944             "used_memory_lua_human:%s\r\n"
2945             "maxmemory:%lld\r\n"
2946             "maxmemory_human:%s\r\n"
2947             "maxmemory_policy:%s\r\n"
2948             "mem_fragmentation_ratio:%.2f\r\n"
2949             "mem_allocator:%s\r\n",
2950             zmalloc_used,
2951             hmem,
2952             server.resident_set_size,
2953             used_memory_rss_hmem,
2954             server.stat_peak_memory,
2955             peak_hmem,
2956             (unsigned long)total_system_mem,
2957             total_system_hmem,
2958             memory_lua,
2959             used_memory_lua_hmem,
2960             server.maxmemory,
2961             maxmemory_hmem,
2962             evict_policy,
2963             zmalloc_get_fragmentation_ratio(server.resident_set_size),
2964             ZMALLOC_LIB
2965             );
2966     }
2967 
2968     /* Persistence */
2969     if (allsections || defsections || !strcasecmp(section,"persistence")) {
2970         if (sections++) info = sdscat(info,"\r\n");
2971         info = sdscatprintf(info,
2972             "# Persistence\r\n"
2973             "loading:%d\r\n"
2974             "rdb_changes_since_last_save:%lld\r\n"
2975             "rdb_bgsave_in_progress:%d\r\n"
2976             "rdb_last_save_time:%jd\r\n"
2977             "rdb_last_bgsave_status:%s\r\n"
2978             "rdb_last_bgsave_time_sec:%jd\r\n"
2979             "rdb_current_bgsave_time_sec:%jd\r\n"
2980             "aof_enabled:%d\r\n"
2981             "aof_rewrite_in_progress:%d\r\n"
2982             "aof_rewrite_scheduled:%d\r\n"
2983             "aof_last_rewrite_time_sec:%jd\r\n"
2984             "aof_current_rewrite_time_sec:%jd\r\n"
2985             "aof_last_bgrewrite_status:%s\r\n"
2986             "aof_last_write_status:%s\r\n",
2987             server.loading,
2988             server.dirty,
2989             server.rdb_child_pid != -1,
2990             (intmax_t)server.lastsave,
2991             (server.lastbgsave_status == C_OK) ? "ok" : "err",
2992             (intmax_t)server.rdb_save_time_last,
2993             (intmax_t)((server.rdb_child_pid == -1) ?
2994                 -1 : time(NULL)-server.rdb_save_time_start),
2995             server.aof_state != AOF_OFF,
2996             server.aof_child_pid != -1,
2997             server.aof_rewrite_scheduled,
2998             (intmax_t)server.aof_rewrite_time_last,
2999             (intmax_t)((server.aof_child_pid == -1) ?
3000                 -1 : time(NULL)-server.aof_rewrite_time_start),
3001             (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
3002             (server.aof_last_write_status == C_OK) ? "ok" : "err");
3003 
3004         if (server.aof_state != AOF_OFF) {
3005             info = sdscatprintf(info,
3006                 "aof_current_size:%lld\r\n"
3007                 "aof_base_size:%lld\r\n"
3008                 "aof_pending_rewrite:%d\r\n"
3009                 "aof_buffer_length:%zu\r\n"
3010                 "aof_rewrite_buffer_length:%lu\r\n"
3011                 "aof_pending_bio_fsync:%llu\r\n"
3012                 "aof_delayed_fsync:%lu\r\n",
3013                 (long long) server.aof_current_size,
3014                 (long long) server.aof_rewrite_base_size,
3015                 server.aof_rewrite_scheduled,
3016                 sdslen(server.aof_buf),
3017                 aofRewriteBufferSize(),
3018                 bioPendingJobsOfType(BIO_AOF_FSYNC),
3019                 server.aof_delayed_fsync);
3020         }
3021 
3022         if (server.loading) {
3023             double perc;
3024             time_t eta, elapsed;
3025             off_t remaining_bytes = server.loading_total_bytes-
3026                                     server.loading_loaded_bytes;
3027 
3028             perc = ((double)server.loading_loaded_bytes /
3029                    (server.loading_total_bytes+1)) * 100;
3030 
3031             elapsed = time(NULL)-server.loading_start_time;
3032             if (elapsed == 0) {
3033                 eta = 1; /* A fake 1 second figure if we don't have
3034                             enough info */
3035             } else {
3036                 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3037             }
3038 
3039             info = sdscatprintf(info,
3040                 "loading_start_time:%jd\r\n"
3041                 "loading_total_bytes:%llu\r\n"
3042                 "loading_loaded_bytes:%llu\r\n"
3043                 "loading_loaded_perc:%.2f\r\n"
3044                 "loading_eta_seconds:%jd\r\n",
3045                 (intmax_t) server.loading_start_time,
3046                 (unsigned long long) server.loading_total_bytes,
3047                 (unsigned long long) server.loading_loaded_bytes,
3048                 perc,
3049                 (intmax_t)eta
3050             );
3051         }
3052     }
3053 
3054     /* Stats */
3055     if (allsections || defsections || !strcasecmp(section,"stats")) {
3056         if (sections++) info = sdscat(info,"\r\n");
3057         info = sdscatprintf(info,
3058             "# Stats\r\n"
3059             "total_connections_received:%lld\r\n"
3060             "total_commands_processed:%lld\r\n"
3061             "instantaneous_ops_per_sec:%lld\r\n"
3062             "total_net_input_bytes:%lld\r\n"
3063             "total_net_output_bytes:%lld\r\n"
3064             "instantaneous_input_kbps:%.2f\r\n"
3065             "instantaneous_output_kbps:%.2f\r\n"
3066             "rejected_connections:%lld\r\n"
3067             "sync_full:%lld\r\n"
3068             "sync_partial_ok:%lld\r\n"
3069             "sync_partial_err:%lld\r\n"
3070             "expired_keys:%lld\r\n"
3071             "evicted_keys:%lld\r\n"
3072             "keyspace_hits:%lld\r\n"
3073             "keyspace_misses:%lld\r\n"
3074             "pubsub_channels:%ld\r\n"
3075             "pubsub_patterns:%lu\r\n"
3076             "latest_fork_usec:%lld\r\n"
3077             "migrate_cached_sockets:%ld\r\n",
3078             server.stat_numconnections,
3079             server.stat_numcommands,
3080             getInstantaneousMetric(STATS_METRIC_COMMAND),
3081             server.stat_net_input_bytes,
3082             server.stat_net_output_bytes,
3083             (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3084             (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3085             server.stat_rejected_conn,
3086             server.stat_sync_full,
3087             server.stat_sync_partial_ok,
3088             server.stat_sync_partial_err,
3089             server.stat_expiredkeys,
3090             server.stat_evictedkeys,
3091             server.stat_keyspace_hits,
3092             server.stat_keyspace_misses,
3093             dictSize(server.pubsub_channels),
3094             listLength(server.pubsub_patterns),
3095             server.stat_fork_time,
3096             dictSize(server.migrate_cached_sockets));
3097     }
3098 
3099     /* Replication */
3100     if (allsections || defsections || !strcasecmp(section,"replication")) {
3101         if (sections++) info = sdscat(info,"\r\n");
3102         info = sdscatprintf(info,
3103             "# Replication\r\n"
3104             "role:%s\r\n",
3105             server.masterhost == NULL ? "master" : "slave");
3106         if (server.masterhost) {
3107             long long slave_repl_offset = 1;
3108 
3109             if (server.master)
3110                 slave_repl_offset = server.master->reploff;
3111             else if (server.cached_master)
3112                 slave_repl_offset = server.cached_master->reploff;
3113 
3114             info = sdscatprintf(info,
3115                 "master_host:%s\r\n"
3116                 "master_port:%d\r\n"
3117                 "master_link_status:%s\r\n"
3118                 "master_last_io_seconds_ago:%d\r\n"
3119                 "master_sync_in_progress:%d\r\n"
3120                 "slave_repl_offset:%lld\r\n"
3121                 ,server.masterhost,
3122                 server.masterport,
3123                 (server.repl_state == REPL_STATE_CONNECTED) ?
3124                     "up" : "down",
3125                 server.master ?
3126                 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3127                 server.repl_state == REPL_STATE_TRANSFER,
3128                 slave_repl_offset
3129             );
3130 
3131             if (server.repl_state == REPL_STATE_TRANSFER) {
3132                 info = sdscatprintf(info,
3133                     "master_sync_left_bytes:%lld\r\n"
3134                     "master_sync_last_io_seconds_ago:%d\r\n"
3135                     , (long long)
3136                         (server.repl_transfer_size - server.repl_transfer_read),
3137                     (int)(server.unixtime-server.repl_transfer_lastio)
3138                 );
3139             }
3140 
3141             if (server.repl_state != REPL_STATE_CONNECTED) {
3142                 info = sdscatprintf(info,
3143                     "master_link_down_since_seconds:%jd\r\n",
3144                     (intmax_t)server.unixtime-server.repl_down_since);
3145             }
3146             info = sdscatprintf(info,
3147                 "slave_priority:%d\r\n"
3148                 "slave_read_only:%d\r\n",
3149                 server.slave_priority,
3150                 server.repl_slave_ro);
3151         }
3152 
3153         info = sdscatprintf(info,
3154             "connected_slaves:%lu\r\n",
3155             listLength(server.slaves));
3156 
3157         /* If min-slaves-to-write is active, write the number of slaves
3158          * currently considered 'good'. */
3159         if (server.repl_min_slaves_to_write &&
3160             server.repl_min_slaves_max_lag) {
3161             info = sdscatprintf(info,
3162                 "min_slaves_good_slaves:%d\r\n",
3163                 server.repl_good_slaves_count);
3164         }
3165 
3166         if (listLength(server.slaves)) {
3167             int slaveid = 0;
3168             listNode *ln;
3169             listIter li;
3170 
3171             listRewind(server.slaves,&li);
3172             while((ln = listNext(&li))) {
3173                 client *slave = listNodeValue(ln);
3174                 char *state = NULL;
3175                 char ip[NET_IP_STR_LEN];
3176                 int port;
3177                 long lag = 0;
3178 
3179                 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue;
3180                 switch(slave->replstate) {
3181                 case SLAVE_STATE_WAIT_BGSAVE_START:
3182                 case SLAVE_STATE_WAIT_BGSAVE_END:
3183                     state = "wait_bgsave";
3184                     break;
3185                 case SLAVE_STATE_SEND_BULK:
3186                     state = "send_bulk";
3187                     break;
3188                 case SLAVE_STATE_ONLINE:
3189                     state = "online";
3190                     break;
3191                 }
3192                 if (state == NULL) continue;
3193                 if (slave->replstate == SLAVE_STATE_ONLINE)
3194                     lag = time(NULL) - slave->repl_ack_time;
3195 
3196                 info = sdscatprintf(info,
3197                     "slave%d:ip=%s,port=%d,state=%s,"
3198                     "offset=%lld,lag=%ld\r\n",
3199                     slaveid,ip,slave->slave_listening_port,state,
3200                     slave->repl_ack_off, lag);
3201                 slaveid++;
3202             }
3203         }
3204         info = sdscatprintf(info,
3205             "master_repl_offset:%lld\r\n"
3206             "repl_backlog_active:%d\r\n"
3207             "repl_backlog_size:%lld\r\n"
3208             "repl_backlog_first_byte_offset:%lld\r\n"
3209             "repl_backlog_histlen:%lld\r\n",
3210             server.master_repl_offset,
3211             server.repl_backlog != NULL,
3212             server.repl_backlog_size,
3213             server.repl_backlog_off,
3214             server.repl_backlog_histlen);
3215     }
3216 
3217     /* CPU */
3218     if (allsections || defsections || !strcasecmp(section,"cpu")) {
3219         if (sections++) info = sdscat(info,"\r\n");
3220         info = sdscatprintf(info,
3221         "# CPU\r\n"
3222         "used_cpu_sys:%.2f\r\n"
3223         "used_cpu_user:%.2f\r\n"
3224         "used_cpu_sys_children:%.2f\r\n"
3225         "used_cpu_user_children:%.2f\r\n",
3226         (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
3227         (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
3228         (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
3229         (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000);
3230     }
3231 
3232     /* cmdtime */
3233     if (allsections || !strcasecmp(section,"commandstats")) {
3234         if (sections++) info = sdscat(info,"\r\n");
3235         info = sdscatprintf(info, "# Commandstats\r\n");
3236         numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
3237         for (j = 0; j < numcommands; j++) {
3238             struct redisCommand *c = redisCommandTable+j;
3239 
3240             if (!c->calls) continue;
3241             info = sdscatprintf(info,
3242                 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3243                 c->name, c->calls, c->microseconds,
3244                 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3245         }
3246     }
3247 
3248     /* Cluster */
3249     if (allsections || defsections || !strcasecmp(section,"cluster")) {
3250         if (sections++) info = sdscat(info,"\r\n");
3251         info = sdscatprintf(info,
3252         "# Cluster\r\n"
3253         "cluster_enabled:%d\r\n",
3254         server.cluster_enabled);
3255     }
3256 
3257     /* Key space */
3258     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3259         if (sections++) info = sdscat(info,"\r\n");
3260         info = sdscatprintf(info, "# Keyspace\r\n");
3261         for (j = 0; j < server.dbnum; j++) {
3262             long long keys, vkeys;
3263 
3264             keys = dictSize(server.db[j].dict);
3265             vkeys = dictSize(server.db[j].expires);
3266             if (keys || vkeys) {
3267                 info = sdscatprintf(info,
3268                     "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3269                     j, keys, vkeys, server.db[j].avg_ttl);
3270             }
3271         }
3272     }
3273     return info;
3274 }
3275 
3276 void infoCommand(client *c) {
3277     char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3278 
3279     if (c->argc > 2) {
3280         addReply(c,shared.syntaxerr);
3281         return;
3282     }
3283     addReplyBulkSds(c, genRedisInfoString(section));
3284 }
3285 
3286 void monitorCommand(client *c) {
3287     /* ignore MONITOR if already slave or in monitor mode */
3288     if (c->flags & CLIENT_SLAVE) return;
3289 
3290     c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3291     listAddNodeTail(server.monitors,c);
3292     addReply(c,shared.ok);
3293 }
3294 
3295 /* ============================ Maxmemory directive  ======================== */
3296 
3297 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config
3298  * file to limit the max memory used by the server, before processing a
3299  * command.
3300  *
3301  * The goal of the function is to free enough memory to keep Redis under the
3302  * configured memory limit.
3303  *
3304  * The function starts calculating how many bytes should be freed to keep
3305  * Redis under the limit, and enters a loop selecting the best keys to
3306  * evict accordingly to the configured policy.
3307  *
3308  * If all the bytes needed to return back under the limit were freed the
3309  * function returns C_OK, otherwise C_ERR is returned, and the caller
3310  * should block the execution of commands that will result in more memory
3311  * used by the server.
3312  *
3313  * ------------------------------------------------------------------------
3314  *
3315  * LRU approximation algorithm
3316  *
3317  * Redis uses an approximation of the LRU algorithm that runs in constant
3318  * memory. Every time there is a key to expire, we sample N keys (with
3319  * N very small, usually in around 5) to populate a pool of best keys to
3320  * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE).
3321  *
3322  * The N keys sampled are added in the pool of good keys to expire (the one
3323  * with an old access time) if they are better than one of the current keys
3324  * in the pool.
3325  *
3326  * After the pool is populated, the best key we have in the pool is expired.
3327  * However note that we don't remove keys from the pool when they are deleted
3328  * so the pool may contain keys that no longer exist.
3329  *
3330  * When we try to evict a key, and all the entries in the pool don't exist
3331  * we populate it again. This time we'll be sure that the pool has at least
3332  * one key that can be evicted, if there is at least one key that can be
3333  * evicted in the whole database. */
3334 
3335 /* Create a new eviction pool. */
3336 struct evictionPoolEntry *evictionPoolAlloc(void) {
3337     struct evictionPoolEntry *ep;
3338     int j;
3339 
3340     ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE);
3341     for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) {
3342         ep[j].idle = 0;
3343         ep[j].key = NULL;
3344     }
3345     return ep;
3346 }
3347 
3348 /* This is an helper function for freeMemoryIfNeeded(), it is used in order
3349  * to populate the evictionPool with a few entries every time we want to
3350  * expire a key. Keys with idle time smaller than one of the current
3351  * keys are added. Keys are always added if there are free entries.
3352  *
3353  * We insert keys on place in ascending order, so keys with the smaller
3354  * idle time are on the left, and keys with the higher idle time on the
3355  * right. */
3356 
3357 #define EVICTION_SAMPLES_ARRAY_SIZE 16
3358 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
3359     int j, k, count;
3360     dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
3361     dictEntry **samples;
3362 
3363     /* Try to use a static buffer: this function is a big hit...
3364      * Note: it was actually measured that this helps. */
3365     if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
3366         samples = _samples;
3367     } else {
3368         samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);
3369     }
3370 
3371     count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
3372     for (j = 0; j < count; j++) {
3373         unsigned long long idle;
3374         sds key;
3375         robj *o;
3376         dictEntry *de;
3377 
3378         de = samples[j];
3379         key = dictGetKey(de);
3380         /* If the dictionary we are sampling from is not the main
3381          * dictionary (but the expires one) we need to lookup the key
3382          * again in the key dictionary to obtain the value object. */
3383         if (sampledict != keydict) de = dictFind(keydict, key);
3384         o = dictGetVal(de);
3385         idle = estimateObjectIdleTime(o);
3386 
3387         /* Insert the element inside the pool.
3388          * First, find the first empty bucket or the first populated
3389          * bucket that has an idle time smaller than our idle time. */
3390         k = 0;
3391         while (k < MAXMEMORY_EVICTION_POOL_SIZE &&
3392                pool[k].key &&
3393                pool[k].idle < idle) k++;
3394         if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) {
3395             /* Can't insert if the element is < the worst element we have
3396              * and there are no empty buckets. */
3397             continue;
3398         } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) {
3399             /* Inserting into empty position. No setup needed before insert. */
3400         } else {
3401             /* Inserting in the middle. Now k points to the first element
3402              * greater than the element to insert.  */
3403             if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) {
3404                 /* Free space on the right? Insert at k shifting
3405                  * all the elements from k to end to the right. */
3406                 memmove(pool+k+1,pool+k,
3407                     sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3408             } else {
3409                 /* No free space on right? Insert at k-1 */
3410                 k--;
3411                 /* Shift all elements on the left of k (included) to the
3412                  * left, so we discard the element with smaller idle time. */
3413                 sdsfree(pool[0].key);
3414                 memmove(pool,pool+1,sizeof(pool[0])*k);
3415             }
3416         }
3417         pool[k].key = sdsdup(key);
3418         pool[k].idle = idle;
3419     }
3420     if (samples != _samples) zfree(samples);
3421 }
3422 
3423 int freeMemoryIfNeeded(void) {
3424     size_t mem_used, mem_tofree, mem_freed;
3425     int slaves = listLength(server.slaves);
3426     mstime_t latency, eviction_latency;
3427 
3428     /* Remove the size of slaves output buffers and AOF buffer from the
3429      * count of used memory. */
3430     mem_used = zmalloc_used_memory();
3431     if (slaves) {
3432         listIter li;
3433         listNode *ln;
3434 
3435         listRewind(server.slaves,&li);
3436         while((ln = listNext(&li))) {
3437             client *slave = listNodeValue(ln);
3438             unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
3439             if (obuf_bytes > mem_used)
3440                 mem_used = 0;
3441             else
3442                 mem_used -= obuf_bytes;
3443         }
3444     }
3445     if (server.aof_state != AOF_OFF) {
3446         mem_used -= sdslen(server.aof_buf);
3447         mem_used -= aofRewriteBufferSize();
3448     }
3449 
3450     /* Check if we are over the memory limit. */
3451     if (mem_used <= server.maxmemory) return C_OK;
3452 
3453     if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
3454         return C_ERR; /* We need to free memory, but policy forbids. */
3455 
3456     /* Compute how much memory we need to free. */
3457     mem_tofree = mem_used - server.maxmemory;
3458     mem_freed = 0;
3459     latencyStartMonitor(latency);
3460     while (mem_freed < mem_tofree) {
3461         int j, k, keys_freed = 0;
3462 
3463         for (j = 0; j < server.dbnum; j++) {
3464             long bestval = 0; /* just to prevent warning */
3465             sds bestkey = NULL;
3466             dictEntry *de;
3467             redisDb *db = server.db+j;
3468             dict *dict;
3469 
3470             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3471                 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
3472             {
3473                 dict = server.db[j].dict;
3474             } else {
3475                 dict = server.db[j].expires;
3476             }
3477             if (dictSize(dict) == 0) continue;
3478 
3479             /* volatile-random and allkeys-random policy */
3480             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
3481                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
3482             {
3483                 de = dictGetRandomKey(dict);
3484                 bestkey = dictGetKey(de);
3485             }
3486 
3487             /* volatile-lru and allkeys-lru policy */
3488             else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3489                 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
3490             {
3491                 struct evictionPoolEntry *pool = db->eviction_pool;
3492 
3493                 while(bestkey == NULL) {
3494                     evictionPoolPopulate(dict, db->dict, db->eviction_pool);
3495                     /* Go backward from best to worst element to evict. */
3496                     for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
3497                         if (pool[k].key == NULL) continue;
3498                         de = dictFind(dict,pool[k].key);
3499 
3500                         /* Remove the entry from the pool. */
3501                         sdsfree(pool[k].key);
3502                         /* Shift all elements on its right to left. */
3503                         memmove(pool+k,pool+k+1,
3504                             sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3505                         /* Clear the element on the right which is empty
3506                          * since we shifted one position to the left.  */
3507                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
3508                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
3509 
3510                         /* If the key exists, is our pick. Otherwise it is
3511                          * a ghost and we need to try the next element. */
3512                         if (de) {
3513                             bestkey = dictGetKey(de);
3514                             break;
3515                         } else {
3516                             /* Ghost... */
3517                             continue;
3518                         }
3519                     }
3520                 }
3521             }
3522 
3523             /* volatile-ttl */
3524             else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
3525                 for (k = 0; k < server.maxmemory_samples; k++) {
3526                     sds thiskey;
3527                     long thisval;
3528 
3529                     de = dictGetRandomKey(dict);
3530                     thiskey = dictGetKey(de);
3531                     thisval = (long) dictGetVal(de);
3532 
3533                     /* Expire sooner (minor expire unix timestamp) is better
3534                      * candidate for deletion */
3535                     if (bestkey == NULL || thisval < bestval) {
3536                         bestkey = thiskey;
3537                         bestval = thisval;
3538                     }
3539                 }
3540             }
3541 
3542             /* Finally remove the selected key. */
3543             if (bestkey) {
3544                 long long delta;
3545 
3546                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
3547                 propagateExpire(db,keyobj);
3548                 /* We compute the amount of memory freed by dbDelete() alone.
3549                  * It is possible that actually the memory needed to propagate
3550                  * the DEL in AOF and replication link is greater than the one
3551                  * we are freeing removing the key, but we can't account for
3552                  * that otherwise we would never exit the loop.
3553                  *
3554                  * AOF and Output buffer memory will be freed eventually so
3555                  * we only care about memory used by the key space. */
3556                 delta = (long long) zmalloc_used_memory();
3557                 latencyStartMonitor(eviction_latency);
3558                 dbDelete(db,keyobj);
3559                 latencyEndMonitor(eviction_latency);
3560                 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
3561                 latencyRemoveNestedEvent(latency,eviction_latency);
3562                 delta -= (long long) zmalloc_used_memory();
3563                 mem_freed += delta;
3564                 server.stat_evictedkeys++;
3565                 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
3566                     keyobj, db->id);
3567                 decrRefCount(keyobj);
3568                 keys_freed++;
3569 
3570                 /* When the memory to free starts to be big enough, we may
3571                  * start spending so much time here that is impossible to
3572                  * deliver data to the slaves fast enough, so we force the
3573                  * transmission here inside the loop. */
3574                 if (slaves) flushSlavesOutputBuffers();
3575             }
3576         }
3577         if (!keys_freed) {
3578             latencyEndMonitor(latency);
3579             latencyAddSampleIfNeeded("eviction-cycle",latency);
3580             return C_ERR; /* nothing to free... */
3581         }
3582     }
3583     latencyEndMonitor(latency);
3584     latencyAddSampleIfNeeded("eviction-cycle",latency);
3585     return C_OK;
3586 }
3587 
3588 /* =================================== Main! ================================ */
3589 
3590 #ifdef __linux__
3591 int linuxOvercommitMemoryValue(void) {
3592     FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3593     char buf[64];
3594 
3595     if (!fp) return -1;
3596     if (fgets(buf,64,fp) == NULL) {
3597         fclose(fp);
3598         return -1;
3599     }
3600     fclose(fp);
3601 
3602     return atoi(buf);
3603 }
3604 
3605 void linuxMemoryWarnings(void) {
3606     if (linuxOvercommitMemoryValue() == 0) {
3607         serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3608     }
3609     if (THPIsEnabled()) {
3610         serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3611     }
3612 }
3613 #endif /* __linux__ */
3614 
3615 void createPidFile(void) {
3616     /* If pidfile requested, but no pidfile defined, use
3617      * default pidfile path */
3618     if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3619 
3620     /* Try to write the pid file in a best-effort way. */
3621     FILE *fp = fopen(server.pidfile,"w");
3622     if (fp) {
3623         fprintf(fp,"%d\n",(int)getpid());
3624         fclose(fp);
3625     }
3626 }
3627 
3628 void daemonize(void) {
3629     int fd;
3630 
3631     if (fork() != 0) exit(0); /* parent exits */
3632     setsid(); /* create a new session */
3633 
3634     /* Every output goes to /dev/null. If Redis is daemonized but
3635      * the 'logfile' is set to 'stdout' in the configuration file
3636      * it will not log at all. */
3637     if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3638         dup2(fd, STDIN_FILENO);
3639         dup2(fd, STDOUT_FILENO);
3640         dup2(fd, STDERR_FILENO);
3641         if (fd > STDERR_FILENO) close(fd);
3642     }
3643 }
3644 
3645 void version(void) {
3646     printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3647         REDIS_VERSION,
3648         redisGitSHA1(),
3649         atoi(redisGitDirty()) > 0,
3650         ZMALLOC_LIB,
3651         sizeof(long) == 4 ? 32 : 64,
3652         (unsigned long long) redisBuildId());
3653     exit(0);
3654 }
3655 
3656 void usage(void) {
3657     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3658     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
3659     fprintf(stderr,"       ./redis-server -v or --version\n");
3660     fprintf(stderr,"       ./redis-server -h or --help\n");
3661     fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
3662     fprintf(stderr,"Examples:\n");
3663     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
3664     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
3665     fprintf(stderr,"       ./redis-server --port 7777\n");
3666     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
3667     fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3668     fprintf(stderr,"Sentinel mode:\n");
3669     fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
3670     exit(1);
3671 }
3672 
3673 void redisAsciiArt(void) {
3674 #include "asciilogo.h"
3675     char *buf = zmalloc(1024*16);
3676     char *mode;
3677 
3678     if (server.cluster_enabled) mode = "cluster";
3679     else if (server.sentinel_mode) mode = "sentinel";
3680     else mode = "standalone";
3681 
3682     if (server.syslog_enabled) {
3683         serverLog(LL_NOTICE,
3684             "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
3685             REDIS_VERSION,
3686             redisGitSHA1(),
3687             strtol(redisGitDirty(),NULL,10) > 0,
3688             (sizeof(long) == 8) ? "64" : "32",
3689             mode, server.port,
3690             (long) getpid()
3691         );
3692     } else {
3693         snprintf(buf,1024*16,ascii_logo,
3694             REDIS_VERSION,
3695             redisGitSHA1(),
3696             strtol(redisGitDirty(),NULL,10) > 0,
3697             (sizeof(long) == 8) ? "64" : "32",
3698             mode, server.port,
3699             (long) getpid()
3700         );
3701         serverLogRaw(LL_NOTICE|LL_RAW,buf);
3702     }
3703     zfree(buf);
3704 }
3705 
3706 static void sigShutdownHandler(int sig) {
3707     char *msg;
3708 
3709     switch (sig) {
3710     case SIGINT:
3711         msg = "Received SIGINT scheduling shutdown...";
3712         break;
3713     case SIGTERM:
3714         msg = "Received SIGTERM scheduling shutdown...";
3715         break;
3716     default:
3717         msg = "Received shutdown signal, scheduling shutdown...";
3718     };
3719 
3720     /* SIGINT is often delivered via Ctrl+C in an interactive session.
3721      * If we receive the signal the second time, we interpret this as
3722      * the user really wanting to quit ASAP without waiting to persist
3723      * on disk. */
3724     if (server.shutdown_asap && sig == SIGINT) {
3725         serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3726         rdbRemoveTempFile(getpid());
3727         exit(1); /* Exit with an error since this was not a clean shutdown. */
3728     } else if (server.loading) {
3729         exit(0);
3730     }
3731 
3732     serverLogFromHandler(LL_WARNING, msg);
3733     server.shutdown_asap = 1;
3734 }
3735 
3736 void setupSignalHandlers(void) {
3737     struct sigaction act;
3738 
3739     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3740      * Otherwise, sa_handler is used. */
3741     sigemptyset(&act.sa_mask);
3742     act.sa_flags = 0;
3743     act.sa_handler = sigShutdownHandler;
3744     sigaction(SIGTERM, &act, NULL);
3745     sigaction(SIGINT, &act, NULL);
3746 
3747 #ifdef HAVE_BACKTRACE
3748     sigemptyset(&act.sa_mask);
3749     act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3750     act.sa_sigaction = sigsegvHandler;
3751     sigaction(SIGSEGV, &act, NULL);
3752     sigaction(SIGBUS, &act, NULL);
3753     sigaction(SIGFPE, &act, NULL);
3754     sigaction(SIGILL, &act, NULL);
3755 #endif
3756     return;
3757 }
3758 
3759 void memtest(size_t megabytes, int passes);
3760 
3761 /* Returns 1 if there is --sentinel among the arguments or if
3762  * argv[0] is exactly "redis-sentinel". */
3763 int checkForSentinelMode(int argc, char **argv) {
3764     int j;
3765 
3766     if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3767     for (j = 1; j < argc; j++)
3768         if (!strcmp(argv[j],"--sentinel")) return 1;
3769     return 0;
3770 }
3771 
3772 /* Function called at startup to load RDB or AOF file in memory. */
3773 void loadDataFromDisk(void) {
3774     long long start = ustime();
3775     if (server.aof_state == AOF_ON) {
3776         if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3777             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3778     } else {
3779         if (rdbLoad(server.rdb_filename) == C_OK) {
3780             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3781                 (float)(ustime()-start)/1000000);
3782         } else if (errno != ENOENT) {
3783             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3784             exit(1);
3785         }
3786     }
3787 }
3788 
3789 void redisOutOfMemoryHandler(size_t allocation_size) {
3790     serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3791         allocation_size);
3792     serverPanic("Redis aborting for OUT OF MEMORY");
3793 }
3794 
3795 void redisSetProcTitle(char *title) {
3796 #ifdef USE_SETPROCTITLE
3797     char *server_mode = "";
3798     if (server.cluster_enabled) server_mode = " [cluster]";
3799     else if (server.sentinel_mode) server_mode = " [sentinel]";
3800 
3801     setproctitle("%s %s:%d%s",
3802         title,
3803         server.bindaddr_count ? server.bindaddr[0] : "*",
3804         server.port,
3805         server_mode);
3806 #else
3807     UNUSED(title);
3808 #endif
3809 }
3810 
3811 /*
3812  * Check whether systemd or upstart have been used to start redis.
3813  */
3814 
3815 int redisSupervisedUpstart(void) {
3816     const char *upstart_job = getenv("UPSTART_JOB");
3817 
3818     if (!upstart_job) {
3819         serverLog(LL_WARNING,
3820                 "upstart supervision requested, but UPSTART_JOB not found");
3821         return 0;
3822     }
3823 
3824     serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness");
3825     raise(SIGSTOP);
3826     unsetenv("UPSTART_JOB");
3827     return 1;
3828 }
3829 
3830 int redisSupervisedSystemd(void) {
3831     const char *notify_socket = getenv("NOTIFY_SOCKET");
3832     int fd = 1;
3833     struct sockaddr_un su;
3834     struct iovec iov;
3835     struct msghdr hdr;
3836     int sendto_flags = 0;
3837 
3838     if (!notify_socket) {
3839         serverLog(LL_WARNING,
3840                 "systemd supervision requested, but NOTIFY_SOCKET not found");
3841         return 0;
3842     }
3843 
3844     if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3845         return 0;
3846     }
3847 
3848     serverLog(LL_NOTICE, "supervised by systemd, will signal readiness");
3849     if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3850         serverLog(LL_WARNING,
3851                 "Can't connect to systemd socket %s", notify_socket);
3852         return 0;
3853     }
3854 
3855     memset(&su, 0, sizeof(su));
3856     su.sun_family = AF_UNIX;
3857     strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3858     su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3859 
3860     if (notify_socket[0] == '@')
3861         su.sun_path[0] = '\0';
3862 
3863     memset(&iov, 0, sizeof(iov));
3864     iov.iov_base = "READY=1";
3865     iov.iov_len = strlen("READY=1");
3866 
3867     memset(&hdr, 0, sizeof(hdr));
3868     hdr.msg_name = &su;
3869     hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
3870         strlen(notify_socket);
3871     hdr.msg_iov = &iov;
3872     hdr.msg_iovlen = 1;
3873 
3874     unsetenv("NOTIFY_SOCKET");
3875 #ifdef HAVE_MSG_NOSIGNAL
3876     sendto_flags |= MSG_NOSIGNAL;
3877 #endif
3878     if (sendmsg(fd, &hdr, sendto_flags) < 0) {
3879         serverLog(LL_WARNING, "Can't send notification to systemd");
3880         close(fd);
3881         return 0;
3882     }
3883     close(fd);
3884     return 1;
3885 }
3886 
3887 int redisIsSupervised(int mode) {
3888     if (mode == SUPERVISED_AUTODETECT) {
3889         const char *upstart_job = getenv("UPSTART_JOB");
3890         const char *notify_socket = getenv("NOTIFY_SOCKET");
3891 
3892         if (upstart_job) {
3893             redisSupervisedUpstart();
3894         } else if (notify_socket) {
3895             redisSupervisedSystemd();
3896         }
3897     } else if (mode == SUPERVISED_UPSTART) {
3898         return redisSupervisedUpstart();
3899     } else if (mode == SUPERVISED_SYSTEMD) {
3900         return redisSupervisedSystemd();
3901     }
3902 
3903     return 0;
3904 }
3905 
3906 
3907 int main(int argc, char **argv) {
3908     struct timeval tv;
3909     int j;
3910 
3911 #ifdef REDIS_TEST
3912     if (argc == 3 && !strcasecmp(argv[1], "test")) {
3913         if (!strcasecmp(argv[2], "ziplist")) {
3914             return ziplistTest(argc, argv);
3915         } else if (!strcasecmp(argv[2], "quicklist")) {
3916             quicklistTest(argc, argv);
3917         } else if (!strcasecmp(argv[2], "intset")) {
3918             return intsetTest(argc, argv);
3919         } else if (!strcasecmp(argv[2], "zipmap")) {
3920             return zipmapTest(argc, argv);
3921         } else if (!strcasecmp(argv[2], "sha1test")) {
3922             return sha1Test(argc, argv);
3923         } else if (!strcasecmp(argv[2], "util")) {
3924             return utilTest(argc, argv);
3925         } else if (!strcasecmp(argv[2], "sds")) {
3926             return sdsTest(argc, argv);
3927         } else if (!strcasecmp(argv[2], "endianconv")) {
3928             return endianconvTest(argc, argv);
3929         } else if (!strcasecmp(argv[2], "crc64")) {
3930             return crc64Test(argc, argv);
3931         }
3932 
3933         return -1; /* test not found */
3934     }
3935 #endif
3936 
3937     /* We need to initialize our libraries, and the server configuration. */
3938 #ifdef INIT_SETPROCTITLE_REPLACEMENT
3939     spt_init(argc, argv);
3940 #endif
3941     setlocale(LC_COLLATE,"");
3942     zmalloc_enable_thread_safeness();
3943     zmalloc_set_oom_handler(redisOutOfMemoryHandler);
3944     srand(time(NULL)^getpid());
3945     gettimeofday(&tv,NULL);
3946     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
3947     server.sentinel_mode = checkForSentinelMode(argc,argv);
3948     initServerConfig();
3949 
3950     /* Store the executable path and arguments in a safe place in order
3951      * to be able to restart the server later. */
3952     server.executable = getAbsolutePath(argv[0]);
3953     server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
3954     server.exec_argv[argc] = NULL;
3955     for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
3956 
3957     /* We need to init sentinel right now as parsing the configuration file
3958      * in sentinel mode will have the effect of populating the sentinel
3959      * data structures with master nodes to monitor. */
3960     if (server.sentinel_mode) {
3961         initSentinelConfig();
3962         initSentinel();
3963     }
3964 
3965     /* Check if we need to start in redis-check-rdb mode. We just execute
3966      * the program main. However the program is part of the Redis executable
3967      * so that we can easily execute an RDB check on loading errors. */
3968     if (strstr(argv[0],"redis-check-rdb") != NULL)
3969         exit(redis_check_rdb_main(argv,argc));
3970 
3971     if (argc >= 2) {
3972         j = 1; /* First option to parse in argv[] */
3973         sds options = sdsempty();
3974         char *configfile = NULL;
3975 
3976         /* Handle special options --help and --version */
3977         if (strcmp(argv[1], "-v") == 0 ||
3978             strcmp(argv[1], "--version") == 0) version();
3979         if (strcmp(argv[1], "--help") == 0 ||
3980             strcmp(argv[1], "-h") == 0) usage();
3981         if (strcmp(argv[1], "--test-memory") == 0) {
3982             if (argc == 3) {
3983                 memtest(atoi(argv[2]),50);
3984                 exit(0);
3985             } else {
3986                 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
3987                 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
3988                 exit(1);
3989             }
3990         }
3991 
3992         /* First argument is the config file name? */
3993         if (argv[j][0] != '-' || argv[j][1] != '-') {
3994             configfile = argv[j];
3995             server.configfile = getAbsolutePath(configfile);
3996             /* Replace the config file in server.exec_argv with
3997              * its absoulte path. */
3998             zfree(server.exec_argv[j]);
3999             server.exec_argv[j] = zstrdup(server.configfile);
4000             j++;
4001         }
4002 
4003         /* All the other options are parsed and conceptually appended to the
4004          * configuration file. For instance --port 6380 will generate the
4005          * string "port 6380\n" to be parsed after the actual file name
4006          * is parsed, if any. */
4007         while(j != argc) {
4008             if (argv[j][0] == '-' && argv[j][1] == '-') {
4009                 /* Option name */
4010                 if (!strcmp(argv[j], "--check-rdb")) {
4011                     /* Argument has no options, need to skip for parsing. */
4012                     j++;
4013                     continue;
4014                 }
4015                 if (sdslen(options)) options = sdscat(options,"\n");
4016                 options = sdscat(options,argv[j]+2);
4017                 options = sdscat(options," ");
4018             } else {
4019                 /* Option argument */
4020                 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4021                 options = sdscat(options," ");
4022             }
4023             j++;
4024         }
4025         if (server.sentinel_mode && configfile && *configfile == '-') {
4026             serverLog(LL_WARNING,
4027                 "Sentinel config from STDIN not allowed.");
4028             serverLog(LL_WARNING,
4029                 "Sentinel needs config file on disk to save state.  Exiting...");
4030             exit(1);
4031         }
4032         resetServerSaveParams();
4033         loadServerConfig(configfile,options);
4034         sdsfree(options);
4035     } else {
4036         serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4037     }
4038 
4039     server.supervised = redisIsSupervised(server.supervised_mode);
4040     int background = server.daemonize && !server.supervised;
4041     if (background) daemonize();
4042 
4043     initServer();
4044     if (background || server.pidfile) createPidFile();
4045     redisSetProcTitle(argv[0]);
4046     redisAsciiArt();
4047     checkTcpBacklogSettings();
4048 
4049     if (!server.sentinel_mode) {
4050         /* Things not needed when running in Sentinel mode. */
4051         serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
4052     #ifdef __linux__
4053         linuxMemoryWarnings();
4054     #endif
4055         loadDataFromDisk();
4056         if (server.cluster_enabled) {
4057             if (verifyClusterConfigWithData() == C_ERR) {
4058                 serverLog(LL_WARNING,
4059                     "You can't have keys in a DB different than DB 0 when in "
4060                     "Cluster mode. Exiting.");
4061                 exit(1);
4062             }
4063         }
4064         if (server.ipfd_count > 0)
4065             serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4066         if (server.sofd > 0)
4067             serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4068     } else {
4069         sentinelIsRunning();
4070     }
4071 
4072     /* Warning the user about suspicious maxmemory setting. */
4073     if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4074         serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4075     }
4076 
4077     aeSetBeforeSleepProc(server.el,beforeSleep);
4078     aeMain(server.el);
4079     aeDeleteEventLoop(server.el);
4080     return 0;
4081 }
4082 
4083 /* The End */
4084