xref: /redis-3.2.3/src/server.c (revision d6bc17c2)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35 
36 #include <time.h>
37 #include <signal.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <assert.h>
41 #include <ctype.h>
42 #include <stdarg.h>
43 #include <arpa/inet.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #include <sys/uio.h>
49 #include <sys/un.h>
50 #include <limits.h>
51 #include <float.h>
52 #include <math.h>
53 #include <sys/resource.h>
54 #include <sys/utsname.h>
55 #include <locale.h>
56 #include <sys/socket.h>
57 
58 /* Our shared "common" objects */
59 
60 struct sharedObjectsStruct shared;
61 
62 /* Global vars that are actually used as constants. The following double
63  * values are used for double on-disk serialization, and are initialized
64  * at runtime to avoid strange compiler optimizations. */
65 
66 double R_Zero, R_PosInf, R_NegInf, R_Nan;
67 
68 /*================================= Globals ================================= */
69 
70 /* Global vars */
71 struct redisServer server; /* server global state */
72 
73 /* Our command table.
74  *
75  * Every entry is composed of the following fields:
76  *
77  * name: a string representing the command name.
78  * function: pointer to the C function implementing the command.
79  * arity: number of arguments, it is possible to use -N to say >= N
80  * sflags: command flags as string. See below for a table of flags.
81  * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
82  * get_keys_proc: an optional function to get key arguments from a command.
83  *                This is only used when the following three fields are not
84  *                enough to specify what arguments are keys.
85  * first_key_index: first argument that is a key
86  * last_key_index: last argument that is a key
87  * key_step: step to get all the keys from first to last argument. For instance
88  *           in MSET the step is two since arguments are key,val,key,val,...
89  * microseconds: microseconds of total execution time for this command.
90  * calls: total number of calls of this command.
91  *
92  * The flags, microseconds and calls fields are computed by Redis and should
93  * always be set to zero.
94  *
95  * Command flags are expressed using strings where every character represents
96  * a flag. Later the populateCommandTable() function will take care of
97  * populating the real 'flags' field using this characters.
98  *
99  * This is the meaning of the flags:
100  *
101  * w: write command (may modify the key space).
102  * r: read command  (will never modify the key space).
103  * m: may increase memory usage once called. Don't allow if out of memory.
104  * a: admin command, like SAVE or SHUTDOWN.
105  * p: Pub/Sub related command.
106  * f: force replication of this command, regardless of server.dirty.
107  * s: command not allowed in scripts.
108  * R: random command. Command is not deterministic, that is, the same command
109  *    with the same arguments, with the same key space, may have different
110  *    results. For instance SPOP and RANDOMKEY are two random commands.
111  * S: Sort command output array if called from script, so that the output
112  *    is deterministic.
113  * l: Allow command while loading the database.
114  * t: Allow command while a slave has stale data but is not allowed to
115  *    server this data. Normally no command is accepted in this condition
116  *    but just a few.
117  * M: Do not automatically propagate the command on MONITOR.
118  * k: Perform an implicit ASKING for this command, so the command will be
119  *    accepted in cluster mode if the slot is marked as 'importing'.
120  * F: Fast command: O(1) or O(log(N)) command that should never delay
121  *    its execution as long as the kernel scheduler is giving us time.
122  *    Note that commands that may trigger a DEL as a side effect (like SET)
123  *    are not fast commands.
124  */
125 struct redisCommand redisCommandTable[] = {
126     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
127     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
128     {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
129     {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
130     {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
131     {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
132     {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
133     {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
134     {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
135     {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
136     {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
137     {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
138     {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
139     {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
140     {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
141     {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
142     {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
143     {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
144     {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
145     {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
146     {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
147     {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
148     {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
149     {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
150     {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
151     {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
152     {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
153     {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
154     {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
155     {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
156     {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
157     {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
158     {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
159     {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
160     {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
161     {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
162     {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
163     {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
164     {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
165     {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},
166     {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
167     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
168     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
169     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
170     {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
171     {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
172     {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
173     {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
174     {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
175     {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
176     {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
177     {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
178     {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
179     {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
180     {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
181     {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
182     {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
183     {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
184     {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
185     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
186     {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
187     {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
188     {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
189     {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
190     {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
191     {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
192     {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
193     {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
194     {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
195     {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
196     {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
197     {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
198     {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
199     {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
200     {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
201     {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
202     {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
203     {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
204     {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
205     {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
206     {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
207     {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
208     {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
209     {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
210     {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
211     {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
212     {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
213     {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
214     {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
215     {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
216     {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
217     {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
218     {"select",selectCommand,2,"rlF",0,NULL,0,0,0,0,0},
219     {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
220     {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
221     {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
222     {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
223     {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
224     {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
225     {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
226     {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
227     {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
228     {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
229     {"auth",authCommand,2,"rsltF",0,NULL,0,0,0,0,0},
230     {"ping",pingCommand,-1,"rtF",0,NULL,0,0,0,0,0},
231     {"echo",echoCommand,2,"rF",0,NULL,0,0,0,0,0},
232     {"save",saveCommand,1,"ars",0,NULL,0,0,0,0,0},
233     {"bgsave",bgsaveCommand,1,"ar",0,NULL,0,0,0,0,0},
234     {"bgrewriteaof",bgrewriteaofCommand,1,"ar",0,NULL,0,0,0,0,0},
235     {"shutdown",shutdownCommand,-1,"arlt",0,NULL,0,0,0,0,0},
236     {"lastsave",lastsaveCommand,1,"rRF",0,NULL,0,0,0,0,0},
237     {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
238     {"multi",multiCommand,1,"rsF",0,NULL,0,0,0,0,0},
239     {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
240     {"discard",discardCommand,1,"rsF",0,NULL,0,0,0,0,0},
241     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
242     {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
243     {"replconf",replconfCommand,-1,"arslt",0,NULL,0,0,0,0,0},
244     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
245     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
246     {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
247     {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
248     {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
249     {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
250     {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
251     {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
252     {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
253     {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
254     {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
255     {"config",configCommand,-2,"art",0,NULL,0,0,0,0,0},
256     {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
257     {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
258     {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
259     {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
260     {"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0},
261     {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
262     {"watch",watchCommand,-2,"rsF",0,NULL,1,-1,1,0,0},
263     {"unwatch",unwatchCommand,1,"rsF",0,NULL,0,0,0,0,0},
264     {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
265     {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
266     {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
267     {"migrate",migrateCommand,-6,"w",0,NULL,3,3,1,0,0},
268     {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
269     {"readonly",readonlyCommand,1,"rF",0,NULL,0,0,0,0,0},
270     {"readwrite",readwriteCommand,1,"rF",0,NULL,0,0,0,0,0},
271     {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
272     {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
273     {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
274     {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
275     {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
276     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
277     {"script",scriptCommand,-2,"rs",0,NULL,0,0,0,0,0},
278     {"time",timeCommand,1,"rRF",0,NULL,0,0,0,0,0},
279     {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
280     {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
281     {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
282     {"wait",waitCommand,3,"rs",0,NULL,0,0,0,0,0},
283     {"command",commandCommand,0,"rlt",0,NULL,0,0,0,0,0},
284     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
285     {"georadius",georadiusCommand,-6,"r",0,NULL,1,1,1,0,0},
286     {"georadiusbymember",georadiusByMemberCommand,-5,"r",0,NULL,1,1,1,0,0},
287     {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
288     {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
289     {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
290     {"pfselftest",pfselftestCommand,1,"r",0,NULL,0,0,0,0,0},
291     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
292     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
293     {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
294     {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
295     {"latency",latencyCommand,-2,"arslt",0,NULL,0,0,0,0,0}
296 };
297 
298 struct evictionPoolEntry *evictionPoolAlloc(void);
299 
300 /*============================ Utility functions ============================ */
301 
302 /* Low level logging. To use only for very big messages, otherwise
303  * serverLog() is to prefer. */
304 void serverLogRaw(int level, const char *msg) {
305     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
306     const char *c = ".-*#";
307     FILE *fp;
308     char buf[64];
309     int rawmode = (level & LL_RAW);
310     int log_to_stdout = server.logfile[0] == '\0';
311 
312     level &= 0xff; /* clear flags */
313     if (level < server.verbosity) return;
314 
315     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
316     if (!fp) return;
317 
318     if (rawmode) {
319         fprintf(fp,"%s",msg);
320     } else {
321         int off;
322         struct timeval tv;
323         int role_char;
324         pid_t pid = getpid();
325 
326         gettimeofday(&tv,NULL);
327         off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec));
328         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
329         if (server.sentinel_mode) {
330             role_char = 'X'; /* Sentinel. */
331         } else if (pid != server.pid) {
332             role_char = 'C'; /* RDB / AOF writing child. */
333         } else {
334             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
335         }
336         fprintf(fp,"%d:%c %s %c %s\n",
337             (int)getpid(),role_char, buf,c[level],msg);
338     }
339     fflush(fp);
340 
341     if (!log_to_stdout) fclose(fp);
342     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
343 }
344 
345 /* Like serverLogRaw() but with printf-alike support. This is the function that
346  * is used across the code. The raw version is only used in order to dump
347  * the INFO output on crash. */
348 void serverLog(int level, const char *fmt, ...) {
349     va_list ap;
350     char msg[LOG_MAX_LEN];
351 
352     if ((level&0xff) < server.verbosity) return;
353 
354     va_start(ap, fmt);
355     vsnprintf(msg, sizeof(msg), fmt, ap);
356     va_end(ap);
357 
358     serverLogRaw(level,msg);
359 }
360 
361 /* Log a fixed message without printf-alike capabilities, in a way that is
362  * safe to call from a signal handler.
363  *
364  * We actually use this only for signals that are not fatal from the point
365  * of view of Redis. Signals that are going to kill the server anyway and
366  * where we need printf-alike features are served by serverLog(). */
367 void serverLogFromHandler(int level, const char *msg) {
368     int fd;
369     int log_to_stdout = server.logfile[0] == '\0';
370     char buf[64];
371 
372     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
373         return;
374     fd = log_to_stdout ? STDOUT_FILENO :
375                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
376     if (fd == -1) return;
377     ll2string(buf,sizeof(buf),getpid());
378     if (write(fd,buf,strlen(buf)) == -1) goto err;
379     if (write(fd,":signal-handler (",17) == -1) goto err;
380     ll2string(buf,sizeof(buf),time(NULL));
381     if (write(fd,buf,strlen(buf)) == -1) goto err;
382     if (write(fd,") ",2) == -1) goto err;
383     if (write(fd,msg,strlen(msg)) == -1) goto err;
384     if (write(fd,"\n",1) == -1) goto err;
385 err:
386     if (!log_to_stdout) close(fd);
387 }
388 
389 /* Return the UNIX time in microseconds */
390 long long ustime(void) {
391     struct timeval tv;
392     long long ust;
393 
394     gettimeofday(&tv, NULL);
395     ust = ((long long)tv.tv_sec)*1000000;
396     ust += tv.tv_usec;
397     return ust;
398 }
399 
400 /* Return the UNIX time in milliseconds */
401 mstime_t mstime(void) {
402     return ustime()/1000;
403 }
404 
405 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
406  * exit(), because the latter may interact with the same file objects used by
407  * the parent process. However if we are testing the coverage normal exit() is
408  * used in order to obtain the right coverage information. */
409 void exitFromChild(int retcode) {
410 #ifdef COVERAGE_TEST
411     exit(retcode);
412 #else
413     _exit(retcode);
414 #endif
415 }
416 
417 /*====================== Hash table type implementation  ==================== */
418 
419 /* This is a hash table type that uses the SDS dynamic strings library as
420  * keys and redis objects as values (objects can hold SDS strings,
421  * lists, sets). */
422 
423 void dictVanillaFree(void *privdata, void *val)
424 {
425     DICT_NOTUSED(privdata);
426     zfree(val);
427 }
428 
429 void dictListDestructor(void *privdata, void *val)
430 {
431     DICT_NOTUSED(privdata);
432     listRelease((list*)val);
433 }
434 
435 int dictSdsKeyCompare(void *privdata, const void *key1,
436         const void *key2)
437 {
438     int l1,l2;
439     DICT_NOTUSED(privdata);
440 
441     l1 = sdslen((sds)key1);
442     l2 = sdslen((sds)key2);
443     if (l1 != l2) return 0;
444     return memcmp(key1, key2, l1) == 0;
445 }
446 
447 /* A case insensitive version used for the command lookup table and other
448  * places where case insensitive non binary-safe comparison is needed. */
449 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
450         const void *key2)
451 {
452     DICT_NOTUSED(privdata);
453 
454     return strcasecmp(key1, key2) == 0;
455 }
456 
457 void dictObjectDestructor(void *privdata, void *val)
458 {
459     DICT_NOTUSED(privdata);
460 
461     if (val == NULL) return; /* Values of swapped out keys as set to NULL */
462     decrRefCount(val);
463 }
464 
465 void dictSdsDestructor(void *privdata, void *val)
466 {
467     DICT_NOTUSED(privdata);
468 
469     sdsfree(val);
470 }
471 
472 int dictObjKeyCompare(void *privdata, const void *key1,
473         const void *key2)
474 {
475     const robj *o1 = key1, *o2 = key2;
476     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
477 }
478 
479 unsigned int dictObjHash(const void *key) {
480     const robj *o = key;
481     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
482 }
483 
484 unsigned int dictSdsHash(const void *key) {
485     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
486 }
487 
488 unsigned int dictSdsCaseHash(const void *key) {
489     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
490 }
491 
492 int dictEncObjKeyCompare(void *privdata, const void *key1,
493         const void *key2)
494 {
495     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
496     int cmp;
497 
498     if (o1->encoding == OBJ_ENCODING_INT &&
499         o2->encoding == OBJ_ENCODING_INT)
500             return o1->ptr == o2->ptr;
501 
502     o1 = getDecodedObject(o1);
503     o2 = getDecodedObject(o2);
504     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
505     decrRefCount(o1);
506     decrRefCount(o2);
507     return cmp;
508 }
509 
510 unsigned int dictEncObjHash(const void *key) {
511     robj *o = (robj*) key;
512 
513     if (sdsEncodedObject(o)) {
514         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
515     } else {
516         if (o->encoding == OBJ_ENCODING_INT) {
517             char buf[32];
518             int len;
519 
520             len = ll2string(buf,32,(long)o->ptr);
521             return dictGenHashFunction((unsigned char*)buf, len);
522         } else {
523             unsigned int hash;
524 
525             o = getDecodedObject(o);
526             hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
527             decrRefCount(o);
528             return hash;
529         }
530     }
531 }
532 
533 /* Sets type hash table */
534 dictType setDictType = {
535     dictEncObjHash,            /* hash function */
536     NULL,                      /* key dup */
537     NULL,                      /* val dup */
538     dictEncObjKeyCompare,      /* key compare */
539     dictObjectDestructor, /* key destructor */
540     NULL                       /* val destructor */
541 };
542 
543 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
544 dictType zsetDictType = {
545     dictEncObjHash,            /* hash function */
546     NULL,                      /* key dup */
547     NULL,                      /* val dup */
548     dictEncObjKeyCompare,      /* key compare */
549     dictObjectDestructor, /* key destructor */
550     NULL                       /* val destructor */
551 };
552 
553 /* Db->dict, keys are sds strings, vals are Redis objects. */
554 dictType dbDictType = {
555     dictSdsHash,                /* hash function */
556     NULL,                       /* key dup */
557     NULL,                       /* val dup */
558     dictSdsKeyCompare,          /* key compare */
559     dictSdsDestructor,          /* key destructor */
560     dictObjectDestructor   /* val destructor */
561 };
562 
563 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
564 dictType shaScriptObjectDictType = {
565     dictSdsCaseHash,            /* hash function */
566     NULL,                       /* key dup */
567     NULL,                       /* val dup */
568     dictSdsKeyCaseCompare,      /* key compare */
569     dictSdsDestructor,          /* key destructor */
570     dictObjectDestructor   /* val destructor */
571 };
572 
573 /* Db->expires */
574 dictType keyptrDictType = {
575     dictSdsHash,               /* hash function */
576     NULL,                      /* key dup */
577     NULL,                      /* val dup */
578     dictSdsKeyCompare,         /* key compare */
579     NULL,                      /* key destructor */
580     NULL                       /* val destructor */
581 };
582 
583 /* Command table. sds string -> command struct pointer. */
584 dictType commandTableDictType = {
585     dictSdsCaseHash,           /* hash function */
586     NULL,                      /* key dup */
587     NULL,                      /* val dup */
588     dictSdsKeyCaseCompare,     /* key compare */
589     dictSdsDestructor,         /* key destructor */
590     NULL                       /* val destructor */
591 };
592 
593 /* Hash type hash table (note that small hashes are represented with ziplists) */
594 dictType hashDictType = {
595     dictEncObjHash,             /* hash function */
596     NULL,                       /* key dup */
597     NULL,                       /* val dup */
598     dictEncObjKeyCompare,       /* key compare */
599     dictObjectDestructor,  /* key destructor */
600     dictObjectDestructor   /* val destructor */
601 };
602 
603 /* Keylist hash table type has unencoded redis objects as keys and
604  * lists as values. It's used for blocking operations (BLPOP) and to
605  * map swapped keys to a list of clients waiting for this keys to be loaded. */
606 dictType keylistDictType = {
607     dictObjHash,                /* hash function */
608     NULL,                       /* key dup */
609     NULL,                       /* val dup */
610     dictObjKeyCompare,          /* key compare */
611     dictObjectDestructor,  /* key destructor */
612     dictListDestructor          /* val destructor */
613 };
614 
615 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
616  * clusterNode structures. */
617 dictType clusterNodesDictType = {
618     dictSdsHash,                /* hash function */
619     NULL,                       /* key dup */
620     NULL,                       /* val dup */
621     dictSdsKeyCompare,          /* key compare */
622     dictSdsDestructor,          /* key destructor */
623     NULL                        /* val destructor */
624 };
625 
626 /* Cluster re-addition blacklist. This maps node IDs to the time
627  * we can re-add this node. The goal is to avoid readding a removed
628  * node for some time. */
629 dictType clusterNodesBlackListDictType = {
630     dictSdsCaseHash,            /* hash function */
631     NULL,                       /* key dup */
632     NULL,                       /* val dup */
633     dictSdsKeyCaseCompare,      /* key compare */
634     dictSdsDestructor,          /* key destructor */
635     NULL                        /* val destructor */
636 };
637 
638 /* Migrate cache dict type. */
639 dictType migrateCacheDictType = {
640     dictSdsHash,                /* hash function */
641     NULL,                       /* key dup */
642     NULL,                       /* val dup */
643     dictSdsKeyCompare,          /* key compare */
644     dictSdsDestructor,          /* key destructor */
645     NULL                        /* val destructor */
646 };
647 
648 /* Replication cached script dict (server.repl_scriptcache_dict).
649  * Keys are sds SHA1 strings, while values are not used at all in the current
650  * implementation. */
651 dictType replScriptCacheDictType = {
652     dictSdsCaseHash,            /* hash function */
653     NULL,                       /* key dup */
654     NULL,                       /* val dup */
655     dictSdsKeyCaseCompare,      /* key compare */
656     dictSdsDestructor,          /* key destructor */
657     NULL                        /* val destructor */
658 };
659 
660 int htNeedsResize(dict *dict) {
661     long long size, used;
662 
663     size = dictSlots(dict);
664     used = dictSize(dict);
665     return (size && used && size > DICT_HT_INITIAL_SIZE &&
666             (used*100/size < HASHTABLE_MIN_FILL));
667 }
668 
669 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
670  * we resize the hash table to save memory */
671 void tryResizeHashTables(int dbid) {
672     if (htNeedsResize(server.db[dbid].dict))
673         dictResize(server.db[dbid].dict);
674     if (htNeedsResize(server.db[dbid].expires))
675         dictResize(server.db[dbid].expires);
676 }
677 
678 /* Our hash table implementation performs rehashing incrementally while
679  * we write/read from the hash table. Still if the server is idle, the hash
680  * table will use two tables for a long time. So we try to use 1 millisecond
681  * of CPU time at every call of this function to perform some rehahsing.
682  *
683  * The function returns 1 if some rehashing was performed, otherwise 0
684  * is returned. */
685 int incrementallyRehash(int dbid) {
686     /* Keys dictionary */
687     if (dictIsRehashing(server.db[dbid].dict)) {
688         dictRehashMilliseconds(server.db[dbid].dict,1);
689         return 1; /* already used our millisecond for this loop... */
690     }
691     /* Expires */
692     if (dictIsRehashing(server.db[dbid].expires)) {
693         dictRehashMilliseconds(server.db[dbid].expires,1);
694         return 1; /* already used our millisecond for this loop... */
695     }
696     return 0;
697 }
698 
699 /* This function is called once a background process of some kind terminates,
700  * as we want to avoid resizing the hash tables when there is a child in order
701  * to play well with copy-on-write (otherwise when a resize happens lots of
702  * memory pages are copied). The goal of this function is to update the ability
703  * for dict.c to resize the hash tables accordingly to the fact we have o not
704  * running childs. */
705 void updateDictResizePolicy(void) {
706     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
707         dictEnableResize();
708     else
709         dictDisableResize();
710 }
711 
712 /* ======================= Cron: called every 100 ms ======================== */
713 
714 /* Helper function for the activeExpireCycle() function.
715  * This function will try to expire the key that is stored in the hash table
716  * entry 'de' of the 'expires' hash table of a Redis database.
717  *
718  * If the key is found to be expired, it is removed from the database and
719  * 1 is returned. Otherwise no operation is performed and 0 is returned.
720  *
721  * When a key is expired, server.stat_expiredkeys is incremented.
722  *
723  * The parameter 'now' is the current time in milliseconds as is passed
724  * to the function to avoid too many gettimeofday() syscalls. */
725 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
726     long long t = dictGetSignedIntegerVal(de);
727     if (now > t) {
728         sds key = dictGetKey(de);
729         robj *keyobj = createStringObject(key,sdslen(key));
730 
731         propagateExpire(db,keyobj);
732         dbDelete(db,keyobj);
733         notifyKeyspaceEvent(NOTIFY_EXPIRED,
734             "expired",keyobj,db->id);
735         decrRefCount(keyobj);
736         server.stat_expiredkeys++;
737         return 1;
738     } else {
739         return 0;
740     }
741 }
742 
743 /* Try to expire a few timed out keys. The algorithm used is adaptive and
744  * will use few CPU cycles if there are few expiring keys, otherwise
745  * it will get more aggressive to avoid that too much memory is used by
746  * keys that can be removed from the keyspace.
747  *
748  * No more than CRON_DBS_PER_CALL databases are tested at every
749  * iteration.
750  *
751  * This kind of call is used when Redis detects that timelimit_exit is
752  * true, so there is more work to do, and we do it more incrementally from
753  * the beforeSleep() function of the event loop.
754  *
755  * Expire cycle type:
756  *
757  * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
758  * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
759  * microseconds, and is not repeated again before the same amount of time.
760  *
761  * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
762  * executed, where the time limit is a percentage of the REDIS_HZ period
763  * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */
764 
765 void activeExpireCycle(int type) {
766     /* This function has some global state in order to continue the work
767      * incrementally across calls. */
768     static unsigned int current_db = 0; /* Last DB tested. */
769     static int timelimit_exit = 0;      /* Time limit hit in previous call? */
770     static long long last_fast_cycle = 0; /* When last fast cycle ran. */
771 
772     int j, iteration = 0;
773     int dbs_per_call = CRON_DBS_PER_CALL;
774     long long start = ustime(), timelimit;
775 
776     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
777         /* Don't start a fast cycle if the previous cycle did not exited
778          * for time limt. Also don't repeat a fast cycle for the same period
779          * as the fast cycle total duration itself. */
780         if (!timelimit_exit) return;
781         if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
782         last_fast_cycle = start;
783     }
784 
785     /* We usually should test CRON_DBS_PER_CALL per iteration, with
786      * two exceptions:
787      *
788      * 1) Don't test more DBs than we have.
789      * 2) If last time we hit the time limit, we want to scan all DBs
790      * in this iteration, as there is work to do in some DB and we don't want
791      * expired keys to use memory for too much time. */
792     if (dbs_per_call > server.dbnum || timelimit_exit)
793         dbs_per_call = server.dbnum;
794 
795     /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
796      * per iteration. Since this function gets called with a frequency of
797      * server.hz times per second, the following is the max amount of
798      * microseconds we can spend in this function. */
799     timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
800     timelimit_exit = 0;
801     if (timelimit <= 0) timelimit = 1;
802 
803     if (type == ACTIVE_EXPIRE_CYCLE_FAST)
804         timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
805 
806     for (j = 0; j < dbs_per_call; j++) {
807         int expired;
808         redisDb *db = server.db+(current_db % server.dbnum);
809 
810         /* Increment the DB now so we are sure if we run out of time
811          * in the current DB we'll restart from the next. This allows to
812          * distribute the time evenly across DBs. */
813         current_db++;
814 
815         /* Continue to expire if at the end of the cycle more than 25%
816          * of the keys were expired. */
817         do {
818             unsigned long num, slots;
819             long long now, ttl_sum;
820             int ttl_samples;
821 
822             /* If there is nothing to expire try next DB ASAP. */
823             if ((num = dictSize(db->expires)) == 0) {
824                 db->avg_ttl = 0;
825                 break;
826             }
827             slots = dictSlots(db->expires);
828             now = mstime();
829 
830             /* When there are less than 1% filled slots getting random
831              * keys is expensive, so stop here waiting for better times...
832              * The dictionary will be resized asap. */
833             if (num && slots > DICT_HT_INITIAL_SIZE &&
834                 (num*100/slots < 1)) break;
835 
836             /* The main collection cycle. Sample random keys among keys
837              * with an expire set, checking for expired ones. */
838             expired = 0;
839             ttl_sum = 0;
840             ttl_samples = 0;
841 
842             if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
843                 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
844 
845             while (num--) {
846                 dictEntry *de;
847                 long long ttl;
848 
849                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
850                 ttl = dictGetSignedIntegerVal(de)-now;
851                 if (activeExpireCycleTryExpire(db,de,now)) expired++;
852                 if (ttl < 0) ttl = 0;
853                 ttl_sum += ttl;
854                 ttl_samples++;
855             }
856 
857             /* Update the average TTL stats for this database. */
858             if (ttl_samples) {
859                 long long avg_ttl = ttl_sum/ttl_samples;
860 
861                 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
862                 /* Smooth the value averaging with the previous one. */
863                 db->avg_ttl = (db->avg_ttl+avg_ttl)/2;
864             }
865 
866             /* We can't block forever here even if there are many keys to
867              * expire. So after a given amount of milliseconds return to the
868              * caller waiting for the other active expire cycle. */
869             iteration++;
870             if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
871                 long long elapsed = ustime()-start;
872 
873                 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
874                 if (elapsed > timelimit) timelimit_exit = 1;
875             }
876             if (timelimit_exit) return;
877             /* We don't repeat the cycle if there are less than 25% of keys
878              * found expired in the current DB. */
879         } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
880     }
881 }
882 
883 unsigned int getLRUClock(void) {
884     return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
885 }
886 
887 /* Add a sample to the operations per second array of samples. */
888 void trackInstantaneousMetric(int metric, long long current_reading) {
889     long long t = mstime() - server.inst_metric[metric].last_sample_time;
890     long long ops = current_reading -
891                     server.inst_metric[metric].last_sample_count;
892     long long ops_sec;
893 
894     ops_sec = t > 0 ? (ops*1000/t) : 0;
895 
896     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
897         ops_sec;
898     server.inst_metric[metric].idx++;
899     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
900     server.inst_metric[metric].last_sample_time = mstime();
901     server.inst_metric[metric].last_sample_count = current_reading;
902 }
903 
904 /* Return the mean of all the samples. */
905 long long getInstantaneousMetric(int metric) {
906     int j;
907     long long sum = 0;
908 
909     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
910         sum += server.inst_metric[metric].samples[j];
911     return sum / STATS_METRIC_SAMPLES;
912 }
913 
914 /* Check for timeouts. Returns non-zero if the client was terminated.
915  * The function gets the current time in milliseconds as argument since
916  * it gets called multiple times in a loop, so calling gettimeofday() for
917  * each iteration would be costly without any actual gain. */
918 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
919     time_t now = now_ms/1000;
920 
921     if (server.maxidletime &&
922         !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
923         !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
924         !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
925         !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
926         (now - c->lastinteraction > server.maxidletime))
927     {
928         serverLog(LL_VERBOSE,"Closing idle client");
929         freeClient(c);
930         return 1;
931     } else if (c->flags & CLIENT_BLOCKED) {
932         /* Blocked OPS timeout is handled with milliseconds resolution.
933          * However note that the actual resolution is limited by
934          * server.hz. */
935 
936         if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
937             /* Handle blocking operation specific timeout. */
938             replyToBlockedClientTimedOut(c);
939             unblockClient(c);
940         } else if (server.cluster_enabled) {
941             /* Cluster: handle unblock & redirect of clients blocked
942              * into keys no longer served by this server. */
943             if (clusterRedirectBlockedClientIfNeeded(c))
944                 unblockClient(c);
945         }
946     }
947     return 0;
948 }
949 
950 /* The client query buffer is an sds.c string that can end with a lot of
951  * free space not used, this function reclaims space if needed.
952  *
953  * The function always returns 0 as it never terminates the client. */
954 int clientsCronResizeQueryBuffer(client *c) {
955     size_t querybuf_size = sdsAllocSize(c->querybuf);
956     time_t idletime = server.unixtime - c->lastinteraction;
957 
958     /* There are two conditions to resize the query buffer:
959      * 1) Query buffer is > BIG_ARG and too big for latest peak.
960      * 2) Client is inactive and the buffer is bigger than 1k. */
961     if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
962          (querybuf_size/(c->querybuf_peak+1)) > 2) ||
963          (querybuf_size > 1024 && idletime > 2))
964     {
965         /* Only resize the query buffer if it is actually wasting space. */
966         if (sdsavail(c->querybuf) > 1024) {
967             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
968         }
969     }
970     /* Reset the peak again to capture the peak memory usage in the next
971      * cycle. */
972     c->querybuf_peak = 0;
973     return 0;
974 }
975 
976 #define CLIENTS_CRON_MIN_ITERATIONS 5
977 void clientsCron(void) {
978     /* Make sure to process at least numclients/server.hz of clients
979      * per call. Since this function is called server.hz times per second
980      * we are sure that in the worst case we process all the clients in 1
981      * second. */
982     int numclients = listLength(server.clients);
983     int iterations = numclients/server.hz;
984     mstime_t now = mstime();
985 
986     /* Process at least a few clients while we are at it, even if we need
987      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
988      * of processing each client once per second. */
989     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
990         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
991                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
992 
993     while(listLength(server.clients) && iterations--) {
994         client *c;
995         listNode *head;
996 
997         /* Rotate the list, take the current head, process.
998          * This way if the client must be removed from the list it's the
999          * first element and we don't incur into O(N) computation. */
1000         listRotate(server.clients);
1001         head = listFirst(server.clients);
1002         c = listNodeValue(head);
1003         /* The following functions do different service checks on the client.
1004          * The protocol is that they return non-zero if the client was
1005          * terminated. */
1006         if (clientsCronHandleTimeout(c,now)) continue;
1007         if (clientsCronResizeQueryBuffer(c)) continue;
1008     }
1009 }
1010 
1011 /* This function handles 'background' operations we are required to do
1012  * incrementally in Redis databases, such as active key expiring, resizing,
1013  * rehashing. */
1014 void databasesCron(void) {
1015     /* Expire keys by random sampling. Not required for slaves
1016      * as master will synthesize DELs for us. */
1017     if (server.active_expire_enabled && server.masterhost == NULL)
1018         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1019 
1020     /* Perform hash tables rehashing if needed, but only if there are no
1021      * other processes saving the DB on disk. Otherwise rehashing is bad
1022      * as will cause a lot of copy-on-write of memory pages. */
1023     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1024         /* We use global counters so if we stop the computation at a given
1025          * DB we'll be able to start from the successive in the next
1026          * cron loop iteration. */
1027         static unsigned int resize_db = 0;
1028         static unsigned int rehash_db = 0;
1029         int dbs_per_call = CRON_DBS_PER_CALL;
1030         int j;
1031 
1032         /* Don't test more DBs than we have. */
1033         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1034 
1035         /* Resize */
1036         for (j = 0; j < dbs_per_call; j++) {
1037             tryResizeHashTables(resize_db % server.dbnum);
1038             resize_db++;
1039         }
1040 
1041         /* Rehash */
1042         if (server.activerehashing) {
1043             for (j = 0; j < dbs_per_call; j++) {
1044                 int work_done = incrementallyRehash(rehash_db % server.dbnum);
1045                 rehash_db++;
1046                 if (work_done) {
1047                     /* If the function did some work, stop here, we'll do
1048                      * more at the next cron loop. */
1049                     break;
1050                 }
1051             }
1052         }
1053     }
1054 }
1055 
1056 /* We take a cached value of the unix time in the global state because with
1057  * virtual memory and aging there is to store the current time in objects at
1058  * every object access, and accuracy is not needed. To access a global var is
1059  * a lot faster than calling time(NULL) */
1060 void updateCachedTime(void) {
1061     server.unixtime = time(NULL);
1062     server.mstime = mstime();
1063 }
1064 
1065 /* This is our timer interrupt, called server.hz times per second.
1066  * Here is where we do a number of things that need to be done asynchronously.
1067  * For instance:
1068  *
1069  * - Active expired keys collection (it is also performed in a lazy way on
1070  *   lookup).
1071  * - Software watchdog.
1072  * - Update some statistic.
1073  * - Incremental rehashing of the DBs hash tables.
1074  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1075  * - Clients timeout of different kinds.
1076  * - Replication reconnection.
1077  * - Many more...
1078  *
1079  * Everything directly called here will be called server.hz times per second,
1080  * so in order to throttle execution of things we want to do less frequently
1081  * a macro is used: run_with_period(milliseconds) { .... }
1082  */
1083 
1084 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1085     int j;
1086     UNUSED(eventLoop);
1087     UNUSED(id);
1088     UNUSED(clientData);
1089 
1090     /* Software watchdog: deliver the SIGALRM that will reach the signal
1091      * handler if we don't return here fast enough. */
1092     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1093 
1094     /* Update the time cache. */
1095     updateCachedTime();
1096 
1097     run_with_period(100) {
1098         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1099         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1100                 server.stat_net_input_bytes);
1101         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1102                 server.stat_net_output_bytes);
1103     }
1104 
1105     /* We have just LRU_BITS bits per object for LRU information.
1106      * So we use an (eventually wrapping) LRU clock.
1107      *
1108      * Note that even if the counter wraps it's not a big problem,
1109      * everything will still work but some object will appear younger
1110      * to Redis. However for this to happen a given object should never be
1111      * touched for all the time needed to the counter to wrap, which is
1112      * not likely.
1113      *
1114      * Note that you can change the resolution altering the
1115      * LRU_CLOCK_RESOLUTION define. */
1116     server.lruclock = getLRUClock();
1117 
1118     /* Record the max memory used since the server was started. */
1119     if (zmalloc_used_memory() > server.stat_peak_memory)
1120         server.stat_peak_memory = zmalloc_used_memory();
1121 
1122     /* Sample the RSS here since this is a relatively slow call. */
1123     server.resident_set_size = zmalloc_get_rss();
1124 
1125     /* We received a SIGTERM, shutting down here in a safe way, as it is
1126      * not ok doing so inside the signal handler. */
1127     if (server.shutdown_asap) {
1128         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1129         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1130         server.shutdown_asap = 0;
1131     }
1132 
1133     /* Show some info about non-empty databases */
1134     run_with_period(5000) {
1135         for (j = 0; j < server.dbnum; j++) {
1136             long long size, used, vkeys;
1137 
1138             size = dictSlots(server.db[j].dict);
1139             used = dictSize(server.db[j].dict);
1140             vkeys = dictSize(server.db[j].expires);
1141             if (used || vkeys) {
1142                 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1143                 /* dictPrintStats(server.dict); */
1144             }
1145         }
1146     }
1147 
1148     /* Show information about connected clients */
1149     if (!server.sentinel_mode) {
1150         run_with_period(5000) {
1151             serverLog(LL_VERBOSE,
1152                 "%lu clients connected (%lu slaves), %zu bytes in use",
1153                 listLength(server.clients)-listLength(server.slaves),
1154                 listLength(server.slaves),
1155                 zmalloc_used_memory());
1156         }
1157     }
1158 
1159     /* We need to do a few operations on clients asynchronously. */
1160     clientsCron();
1161 
1162     /* Handle background operations on Redis databases. */
1163     databasesCron();
1164 
1165     /* Start a scheduled AOF rewrite if this was requested by the user while
1166      * a BGSAVE was in progress. */
1167     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1168         server.aof_rewrite_scheduled)
1169     {
1170         rewriteAppendOnlyFileBackground();
1171     }
1172 
1173     /* Check if a background saving or AOF rewrite in progress terminated. */
1174     if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1175         ldbPendingChildren())
1176     {
1177         int statloc;
1178         pid_t pid;
1179 
1180         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1181             int exitcode = WEXITSTATUS(statloc);
1182             int bysignal = 0;
1183 
1184             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1185 
1186             if (pid == -1) {
1187                 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1188                     "rdb_child_pid = %d, aof_child_pid = %d",
1189                     strerror(errno),
1190                     (int) server.rdb_child_pid,
1191                     (int) server.aof_child_pid);
1192             } else if (pid == server.rdb_child_pid) {
1193                 backgroundSaveDoneHandler(exitcode,bysignal);
1194             } else if (pid == server.aof_child_pid) {
1195                 backgroundRewriteDoneHandler(exitcode,bysignal);
1196             } else {
1197                 if (!ldbRemoveChild(pid)) {
1198                     serverLog(LL_WARNING,
1199                         "Warning, detected child with unmatched pid: %ld",
1200                         (long)pid);
1201                 }
1202             }
1203             updateDictResizePolicy();
1204         }
1205     } else {
1206         /* If there is not a background saving/rewrite in progress check if
1207          * we have to save/rewrite now */
1208          for (j = 0; j < server.saveparamslen; j++) {
1209             struct saveparam *sp = server.saveparams+j;
1210 
1211             /* Save if we reached the given amount of changes,
1212              * the given amount of seconds, and if the latest bgsave was
1213              * successful or if, in case of an error, at least
1214              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1215             if (server.dirty >= sp->changes &&
1216                 server.unixtime-server.lastsave > sp->seconds &&
1217                 (server.unixtime-server.lastbgsave_try >
1218                  CONFIG_BGSAVE_RETRY_DELAY ||
1219                  server.lastbgsave_status == C_OK))
1220             {
1221                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1222                     sp->changes, (int)sp->seconds);
1223                 rdbSaveBackground(server.rdb_filename);
1224                 break;
1225             }
1226          }
1227 
1228          /* Trigger an AOF rewrite if needed */
1229          if (server.rdb_child_pid == -1 &&
1230              server.aof_child_pid == -1 &&
1231              server.aof_rewrite_perc &&
1232              server.aof_current_size > server.aof_rewrite_min_size)
1233          {
1234             long long base = server.aof_rewrite_base_size ?
1235                             server.aof_rewrite_base_size : 1;
1236             long long growth = (server.aof_current_size*100/base) - 100;
1237             if (growth >= server.aof_rewrite_perc) {
1238                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1239                 rewriteAppendOnlyFileBackground();
1240             }
1241          }
1242     }
1243 
1244 
1245     /* AOF postponed flush: Try at every cron cycle if the slow fsync
1246      * completed. */
1247     if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1248 
1249     /* AOF write errors: in this case we have a buffer to flush as well and
1250      * clear the AOF error in case of success to make the DB writable again,
1251      * however to try every second is enough in case of 'hz' is set to
1252      * an higher frequency. */
1253     run_with_period(1000) {
1254         if (server.aof_last_write_status == C_ERR)
1255             flushAppendOnlyFile(0);
1256     }
1257 
1258     /* Close clients that need to be closed asynchronous */
1259     freeClientsInAsyncFreeQueue();
1260 
1261     /* Clear the paused clients flag if needed. */
1262     clientsArePaused(); /* Don't check return value, just use the side effect. */
1263 
1264     /* Replication cron function -- used to reconnect to master and
1265      * to detect transfer failures. */
1266     run_with_period(1000) replicationCron();
1267 
1268     /* Run the Redis Cluster cron. */
1269     run_with_period(100) {
1270         if (server.cluster_enabled) clusterCron();
1271     }
1272 
1273     /* Run the Sentinel timer if we are in sentinel mode. */
1274     run_with_period(100) {
1275         if (server.sentinel_mode) sentinelTimer();
1276     }
1277 
1278     /* Cleanup expired MIGRATE cached sockets. */
1279     run_with_period(1000) {
1280         migrateCloseTimedoutSockets();
1281     }
1282 
1283     server.cronloops++;
1284     return 1000/server.hz;
1285 }
1286 
1287 /* This function gets called every time Redis is entering the
1288  * main loop of the event driven library, that is, before to sleep
1289  * for ready file descriptors. */
1290 void beforeSleep(struct aeEventLoop *eventLoop) {
1291     UNUSED(eventLoop);
1292 
1293     /* Call the Redis Cluster before sleep function. Note that this function
1294      * may change the state of Redis Cluster (from ok to fail or vice versa),
1295      * so it's a good idea to call it before serving the unblocked clients
1296      * later in this function. */
1297     if (server.cluster_enabled) clusterBeforeSleep();
1298 
1299     /* Run a fast expire cycle (the called function will return
1300      * ASAP if a fast cycle is not needed). */
1301     if (server.active_expire_enabled && server.masterhost == NULL)
1302         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1303 
1304     /* Send all the slaves an ACK request if at least one client blocked
1305      * during the previous event loop iteration. */
1306     if (server.get_ack_from_slaves) {
1307         robj *argv[3];
1308 
1309         argv[0] = createStringObject("REPLCONF",8);
1310         argv[1] = createStringObject("GETACK",6);
1311         argv[2] = createStringObject("*",1); /* Not used argument. */
1312         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1313         decrRefCount(argv[0]);
1314         decrRefCount(argv[1]);
1315         decrRefCount(argv[2]);
1316         server.get_ack_from_slaves = 0;
1317     }
1318 
1319     /* Unblock all the clients blocked for synchronous replication
1320      * in WAIT. */
1321     if (listLength(server.clients_waiting_acks))
1322         processClientsWaitingReplicas();
1323 
1324     /* Try to process pending commands for clients that were just unblocked. */
1325     if (listLength(server.unblocked_clients))
1326         processUnblockedClients();
1327 
1328     /* Write the AOF buffer on disk */
1329     flushAppendOnlyFile(0);
1330 
1331     /* Handle writes with pending output buffers. */
1332     handleClientsWithPendingWrites();
1333 }
1334 
1335 /* =========================== Server initialization ======================== */
1336 
1337 void createSharedObjects(void) {
1338     int j;
1339 
1340     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1341     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1342     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1343     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1344     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1345     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1346     shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1347     shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1348     shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1349     shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1350     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1351     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1352     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1353     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1354         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1355     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1356         "-ERR no such key\r\n"));
1357     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1358         "-ERR syntax error\r\n"));
1359     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1360         "-ERR source and destination objects are the same\r\n"));
1361     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1362         "-ERR index out of range\r\n"));
1363     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1364         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1365     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1366         "-LOADING Redis is loading the dataset in memory\r\n"));
1367     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1368         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1369     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1370         "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
1371     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1372         "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
1373     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1374         "-READONLY You can't write against a read only slave.\r\n"));
1375     shared.noautherr = createObject(OBJ_STRING,sdsnew(
1376         "-NOAUTH Authentication required.\r\n"));
1377     shared.oomerr = createObject(OBJ_STRING,sdsnew(
1378         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1379     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1380         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1381     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1382         "-NOREPLICAS Not enough good slaves to write.\r\n"));
1383     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1384         "-BUSYKEY Target key name already exists.\r\n"));
1385     shared.space = createObject(OBJ_STRING,sdsnew(" "));
1386     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1387     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1388 
1389     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1390         char dictid_str[64];
1391         int dictid_len;
1392 
1393         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1394         shared.select[j] = createObject(OBJ_STRING,
1395             sdscatprintf(sdsempty(),
1396                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1397                 dictid_len, dictid_str));
1398     }
1399     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1400     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1401     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1402     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1403     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1404     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1405     shared.del = createStringObject("DEL",3);
1406     shared.rpop = createStringObject("RPOP",4);
1407     shared.lpop = createStringObject("LPOP",4);
1408     shared.lpush = createStringObject("LPUSH",5);
1409     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1410         shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j);
1411         shared.integers[j]->encoding = OBJ_ENCODING_INT;
1412     }
1413     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1414         shared.mbulkhdr[j] = createObject(OBJ_STRING,
1415             sdscatprintf(sdsempty(),"*%d\r\n",j));
1416         shared.bulkhdr[j] = createObject(OBJ_STRING,
1417             sdscatprintf(sdsempty(),"$%d\r\n",j));
1418     }
1419     /* The following two shared objects, minstring and maxstrings, are not
1420      * actually used for their value but as a special object meaning
1421      * respectively the minimum possible string and the maximum possible
1422      * string in string comparisons for the ZRANGEBYLEX command. */
1423     shared.minstring = createStringObject("minstring",9);
1424     shared.maxstring = createStringObject("maxstring",9);
1425 }
1426 
1427 void initServerConfig(void) {
1428     int j;
1429 
1430     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1431     server.configfile = NULL;
1432     server.executable = NULL;
1433     server.hz = CONFIG_DEFAULT_HZ;
1434     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1435     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1436     server.port = CONFIG_DEFAULT_SERVER_PORT;
1437     server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1438     server.bindaddr_count = 0;
1439     server.unixsocket = NULL;
1440     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1441     server.ipfd_count = 0;
1442     server.sofd = -1;
1443     server.dbnum = CONFIG_DEFAULT_DBNUM;
1444     server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1445     server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1446     server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1447     server.active_expire_enabled = 1;
1448     server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1449     server.saveparams = NULL;
1450     server.loading = 0;
1451     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1452     server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1453     server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1454     server.syslog_facility = LOG_LOCAL0;
1455     server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1456     server.supervised = 0;
1457     server.supervised_mode = SUPERVISED_NONE;
1458     server.aof_state = AOF_OFF;
1459     server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1460     server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1461     server.aof_rewrite_perc = AOF_REWRITE_PERC;
1462     server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1463     server.aof_rewrite_base_size = 0;
1464     server.aof_rewrite_scheduled = 0;
1465     server.aof_last_fsync = time(NULL);
1466     server.aof_rewrite_time_last = -1;
1467     server.aof_rewrite_time_start = -1;
1468     server.aof_lastbgrewrite_status = C_OK;
1469     server.aof_delayed_fsync = 0;
1470     server.aof_fd = -1;
1471     server.aof_selected_db = -1; /* Make sure the first time will not match */
1472     server.aof_flush_postponed_start = 0;
1473     server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1474     server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1475     server.pidfile = NULL;
1476     server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1477     server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1478     server.requirepass = NULL;
1479     server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1480     server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1481     server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1482     server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1483     server.notify_keyspace_events = 0;
1484     server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1485     server.bpop_blocked_clients = 0;
1486     server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1487     server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1488     server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1489     server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1490     server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1491     server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1492     server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1493     server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1494     server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1495     server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1496     server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1497     server.shutdown_asap = 0;
1498     server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1499     server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1500     server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1501     server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1502     server.cluster_enabled = 0;
1503     server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1504     server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1505     server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1506     server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1507     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1508     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1509     server.next_client_id = 1; /* Client IDs, start from 1 .*/
1510     server.loading_process_events_interval_bytes = (1024*1024*2);
1511 
1512     server.lruclock = getLRUClock();
1513     resetServerSaveParams();
1514 
1515     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
1516     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
1517     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1518     /* Replication related */
1519     server.masterauth = NULL;
1520     server.masterhost = NULL;
1521     server.masterport = 6379;
1522     server.master = NULL;
1523     server.cached_master = NULL;
1524     server.repl_master_initial_offset = -1;
1525     server.repl_state = REPL_STATE_NONE;
1526     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1527     server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1528     server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1529     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1530     server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1531     server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1532     server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1533     server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1534     server.master_repl_offset = 0;
1535 
1536     /* Replication partial resync backlog */
1537     server.repl_backlog = NULL;
1538     server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1539     server.repl_backlog_histlen = 0;
1540     server.repl_backlog_idx = 0;
1541     server.repl_backlog_off = 0;
1542     server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1543     server.repl_no_slaves_since = time(NULL);
1544 
1545     /* Client output buffer limits */
1546     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1547         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1548 
1549     /* Double constants initialization */
1550     R_Zero = 0.0;
1551     R_PosInf = 1.0/R_Zero;
1552     R_NegInf = -1.0/R_Zero;
1553     R_Nan = R_Zero/R_Zero;
1554 
1555     /* Command table -- we initiialize it here as it is part of the
1556      * initial configuration, since command names may be changed via
1557      * redis.conf using the rename-command directive. */
1558     server.commands = dictCreate(&commandTableDictType,NULL);
1559     server.orig_commands = dictCreate(&commandTableDictType,NULL);
1560     populateCommandTable();
1561     server.delCommand = lookupCommandByCString("del");
1562     server.multiCommand = lookupCommandByCString("multi");
1563     server.lpushCommand = lookupCommandByCString("lpush");
1564     server.lpopCommand = lookupCommandByCString("lpop");
1565     server.rpopCommand = lookupCommandByCString("rpop");
1566     server.sremCommand = lookupCommandByCString("srem");
1567     server.execCommand = lookupCommandByCString("exec");
1568 
1569     /* Slow log */
1570     server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1571     server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1572 
1573     /* Latency monitor */
1574     server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1575 
1576     /* Debugging */
1577     server.assert_failed = "<no assertion failed>";
1578     server.assert_file = "<no file>";
1579     server.assert_line = 0;
1580     server.bug_report_start = 0;
1581     server.watchdog_period = 0;
1582 }
1583 
1584 extern char **environ;
1585 
1586 /* Restart the server, executing the same executable that started this
1587  * instance, with the same arguments and configuration file.
1588  *
1589  * The function is designed to directly call execve() so that the new
1590  * server instance will retain the PID of the previous one.
1591  *
1592  * The list of flags, that may be bitwise ORed together, alter the
1593  * behavior of this function:
1594  *
1595  * RESTART_SERVER_NONE              No flags.
1596  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
1597  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
1598  *
1599  * On success the function does not return, because the process turns into
1600  * a different process. On error C_ERR is returned. */
1601 int restartServer(int flags, mstime_t delay) {
1602     int j;
1603 
1604     /* Check if we still have accesses to the executable that started this
1605      * server instance. */
1606     if (access(server.executable,X_OK) == -1) return C_ERR;
1607 
1608     /* Config rewriting. */
1609     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1610         server.configfile &&
1611         rewriteConfig(server.configfile) == -1) return C_ERR;
1612 
1613     /* Perform a proper shutdown. */
1614     if (flags & RESTART_SERVER_GRACEFULLY &&
1615         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR;
1616 
1617     /* Close all file descriptors, with the exception of stdin, stdout, strerr
1618      * which are useful if we restart a Redis server which is not daemonized. */
1619     for (j = 3; j < (int)server.maxclients + 1024; j++) close(j);
1620 
1621     /* Execute the server with the original command line. */
1622     if (delay) usleep(delay*1000);
1623     execve(server.executable,server.exec_argv,environ);
1624 
1625     /* If an error occurred here, there is nothing we can do, but exit. */
1626     _exit(1);
1627 
1628     return C_ERR; /* Never reached. */
1629 }
1630 
1631 /* This function will try to raise the max number of open files accordingly to
1632  * the configured max number of clients. It also reserves a number of file
1633  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1634  * persistence, listening sockets, log files and so forth.
1635  *
1636  * If it will not be possible to set the limit accordingly to the configured
1637  * max number of clients, the function will do the reverse setting
1638  * server.maxclients to the value that we can actually handle. */
1639 void adjustOpenFilesLimit(void) {
1640     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1641     struct rlimit limit;
1642 
1643     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1644         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1645             strerror(errno));
1646         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1647     } else {
1648         rlim_t oldlimit = limit.rlim_cur;
1649 
1650         /* Set the max number of files if the current limit is not enough
1651          * for our needs. */
1652         if (oldlimit < maxfiles) {
1653             rlim_t bestlimit;
1654             int setrlimit_error = 0;
1655 
1656             /* Try to set the file limit to match 'maxfiles' or at least
1657              * to the higher value supported less than maxfiles. */
1658             bestlimit = maxfiles;
1659             while(bestlimit > oldlimit) {
1660                 rlim_t decr_step = 16;
1661 
1662                 limit.rlim_cur = bestlimit;
1663                 limit.rlim_max = bestlimit;
1664                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1665                 setrlimit_error = errno;
1666 
1667                 /* We failed to set file limit to 'bestlimit'. Try with a
1668                  * smaller limit decrementing by a few FDs per iteration. */
1669                 if (bestlimit < decr_step) break;
1670                 bestlimit -= decr_step;
1671             }
1672 
1673             /* Assume that the limit we get initially is still valid if
1674              * our last try was even lower. */
1675             if (bestlimit < oldlimit) bestlimit = oldlimit;
1676 
1677             if (bestlimit < maxfiles) {
1678                 int old_maxclients = server.maxclients;
1679                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1680                 if (server.maxclients < 1) {
1681                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
1682                         "of %llu is not enough for the server to start. "
1683                         "Please increase your open file limit to at least "
1684                         "%llu. Exiting.",
1685                         (unsigned long long) oldlimit,
1686                         (unsigned long long) maxfiles);
1687                     exit(1);
1688                 }
1689                 serverLog(LL_WARNING,"You requested maxclients of %d "
1690                     "requiring at least %llu max file descriptors.",
1691                     old_maxclients,
1692                     (unsigned long long) maxfiles);
1693                 serverLog(LL_WARNING,"Server can't set maximum open files "
1694                     "to %llu because of OS error: %s.",
1695                     (unsigned long long) maxfiles, strerror(setrlimit_error));
1696                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1697                     "maxclients has been reduced to %d to compensate for "
1698                     "low ulimit. "
1699                     "If you need higher maxclients increase 'ulimit -n'.",
1700                     (unsigned long long) bestlimit, server.maxclients);
1701             } else {
1702                 serverLog(LL_NOTICE,"Increased maximum number of open files "
1703                     "to %llu (it was originally set to %llu).",
1704                     (unsigned long long) maxfiles,
1705                     (unsigned long long) oldlimit);
1706             }
1707         }
1708     }
1709 }
1710 
1711 /* Check that server.tcp_backlog can be actually enforced in Linux according
1712  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
1713 void checkTcpBacklogSettings(void) {
1714 #ifdef HAVE_PROC_SOMAXCONN
1715     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1716     char buf[1024];
1717     if (!fp) return;
1718     if (fgets(buf,sizeof(buf),fp) != NULL) {
1719         int somaxconn = atoi(buf);
1720         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1721             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1722         }
1723     }
1724     fclose(fp);
1725 #endif
1726 }
1727 
1728 /* Initialize a set of file descriptors to listen to the specified 'port'
1729  * binding the addresses specified in the Redis server configuration.
1730  *
1731  * The listening file descriptors are stored in the integer array 'fds'
1732  * and their number is set in '*count'.
1733  *
1734  * The addresses to bind are specified in the global server.bindaddr array
1735  * and their number is server.bindaddr_count. If the server configuration
1736  * contains no specific addresses to bind, this function will try to
1737  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1738  *
1739  * On success the function returns C_OK.
1740  *
1741  * On error the function returns C_ERR. For the function to be on
1742  * error, at least one of the server.bindaddr addresses was
1743  * impossible to bind, or no bind addresses were specified in the server
1744  * configuration but the function is not able to bind * for at least
1745  * one of the IPv4 or IPv6 protocols. */
1746 int listenToPort(int port, int *fds, int *count) {
1747     int j;
1748 
1749     /* Force binding of 0.0.0.0 if no bind address is specified, always
1750      * entering the loop if j == 0. */
1751     if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1752     for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1753         if (server.bindaddr[j] == NULL) {
1754             /* Bind * for both IPv6 and IPv4, we enter here only if
1755              * server.bindaddr_count == 0. */
1756             fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1757                 server.tcp_backlog);
1758             if (fds[*count] != ANET_ERR) {
1759                 anetNonBlock(NULL,fds[*count]);
1760                 (*count)++;
1761             }
1762             fds[*count] = anetTcpServer(server.neterr,port,NULL,
1763                 server.tcp_backlog);
1764             if (fds[*count] != ANET_ERR) {
1765                 anetNonBlock(NULL,fds[*count]);
1766                 (*count)++;
1767             }
1768             /* Exit the loop if we were able to bind * on IPv4 or IPv6,
1769              * otherwise fds[*count] will be ANET_ERR and we'll print an
1770              * error and return to the caller with an error. */
1771             if (*count) break;
1772         } else if (strchr(server.bindaddr[j],':')) {
1773             /* Bind IPv6 address. */
1774             fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1775                 server.tcp_backlog);
1776         } else {
1777             /* Bind IPv4 address. */
1778             fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1779                 server.tcp_backlog);
1780         }
1781         if (fds[*count] == ANET_ERR) {
1782             serverLog(LL_WARNING,
1783                 "Creating Server TCP listening socket %s:%d: %s",
1784                 server.bindaddr[j] ? server.bindaddr[j] : "*",
1785                 port, server.neterr);
1786             return C_ERR;
1787         }
1788         anetNonBlock(NULL,fds[*count]);
1789         (*count)++;
1790     }
1791     return C_OK;
1792 }
1793 
1794 /* Resets the stats that we expose via INFO or other means that we want
1795  * to reset via CONFIG RESETSTAT. The function is also used in order to
1796  * initialize these fields in initServer() at server startup. */
1797 void resetServerStats(void) {
1798     int j;
1799 
1800     server.stat_numcommands = 0;
1801     server.stat_numconnections = 0;
1802     server.stat_expiredkeys = 0;
1803     server.stat_evictedkeys = 0;
1804     server.stat_keyspace_misses = 0;
1805     server.stat_keyspace_hits = 0;
1806     server.stat_fork_time = 0;
1807     server.stat_fork_rate = 0;
1808     server.stat_rejected_conn = 0;
1809     server.stat_sync_full = 0;
1810     server.stat_sync_partial_ok = 0;
1811     server.stat_sync_partial_err = 0;
1812     for (j = 0; j < STATS_METRIC_COUNT; j++) {
1813         server.inst_metric[j].idx = 0;
1814         server.inst_metric[j].last_sample_time = mstime();
1815         server.inst_metric[j].last_sample_count = 0;
1816         memset(server.inst_metric[j].samples,0,
1817             sizeof(server.inst_metric[j].samples));
1818     }
1819     server.stat_net_input_bytes = 0;
1820     server.stat_net_output_bytes = 0;
1821     server.aof_delayed_fsync = 0;
1822 }
1823 
1824 void initServer(void) {
1825     int j;
1826 
1827     signal(SIGHUP, SIG_IGN);
1828     signal(SIGPIPE, SIG_IGN);
1829     setupSignalHandlers();
1830 
1831     if (server.syslog_enabled) {
1832         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
1833             server.syslog_facility);
1834     }
1835 
1836     server.pid = getpid();
1837     server.current_client = NULL;
1838     server.clients = listCreate();
1839     server.clients_to_close = listCreate();
1840     server.slaves = listCreate();
1841     server.monitors = listCreate();
1842     server.clients_pending_write = listCreate();
1843     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
1844     server.unblocked_clients = listCreate();
1845     server.ready_keys = listCreate();
1846     server.clients_waiting_acks = listCreate();
1847     server.get_ack_from_slaves = 0;
1848     server.clients_paused = 0;
1849     server.system_memory_size = zmalloc_get_memory_size();
1850 
1851     createSharedObjects();
1852     adjustOpenFilesLimit();
1853     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
1854     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1855 
1856     /* Open the TCP listening socket for the user commands. */
1857     if (server.port != 0 &&
1858         listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
1859         exit(1);
1860 
1861     /* Open the listening Unix domain socket. */
1862     if (server.unixsocket != NULL) {
1863         unlink(server.unixsocket); /* don't care if this fails */
1864         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
1865             server.unixsocketperm, server.tcp_backlog);
1866         if (server.sofd == ANET_ERR) {
1867             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
1868             exit(1);
1869         }
1870         anetNonBlock(NULL,server.sofd);
1871     }
1872 
1873     /* Abort if there are no listening sockets at all. */
1874     if (server.ipfd_count == 0 && server.sofd < 0) {
1875         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
1876         exit(1);
1877     }
1878 
1879     /* Create the Redis databases, and initialize other internal state. */
1880     for (j = 0; j < server.dbnum; j++) {
1881         server.db[j].dict = dictCreate(&dbDictType,NULL);
1882         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
1883         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
1884         server.db[j].ready_keys = dictCreate(&setDictType,NULL);
1885         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
1886         server.db[j].eviction_pool = evictionPoolAlloc();
1887         server.db[j].id = j;
1888         server.db[j].avg_ttl = 0;
1889     }
1890     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
1891     server.pubsub_patterns = listCreate();
1892     listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
1893     listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
1894     server.cronloops = 0;
1895     server.rdb_child_pid = -1;
1896     server.aof_child_pid = -1;
1897     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1898     aofRewriteBufferReset();
1899     server.aof_buf = sdsempty();
1900     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
1901     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
1902     server.rdb_save_time_last = -1;
1903     server.rdb_save_time_start = -1;
1904     server.dirty = 0;
1905     resetServerStats();
1906     /* A few stats we don't want to reset: server startup time, and peak mem. */
1907     server.stat_starttime = time(NULL);
1908     server.stat_peak_memory = 0;
1909     server.resident_set_size = 0;
1910     server.lastbgsave_status = C_OK;
1911     server.aof_last_write_status = C_OK;
1912     server.aof_last_write_errno = 0;
1913     server.repl_good_slaves_count = 0;
1914     updateCachedTime();
1915 
1916     /* Create the serverCron() time event, that's our main way to process
1917      * background operations. */
1918     if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
1919         serverPanic("Can't create the serverCron time event.");
1920         exit(1);
1921     }
1922 
1923     /* Create an event handler for accepting new connections in TCP and Unix
1924      * domain sockets. */
1925     for (j = 0; j < server.ipfd_count; j++) {
1926         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
1927             acceptTcpHandler,NULL) == AE_ERR)
1928             {
1929                 serverPanic(
1930                     "Unrecoverable error creating server.ipfd file event.");
1931             }
1932     }
1933     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
1934         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
1935 
1936     /* Open the AOF file if needed. */
1937     if (server.aof_state == AOF_ON) {
1938         server.aof_fd = open(server.aof_filename,
1939                                O_WRONLY|O_APPEND|O_CREAT,0644);
1940         if (server.aof_fd == -1) {
1941             serverLog(LL_WARNING, "Can't open the append-only file: %s",
1942                 strerror(errno));
1943             exit(1);
1944         }
1945     }
1946 
1947     /* 32 bit instances are limited to 4GB of address space, so if there is
1948      * no explicit limit in the user provided configuration we set a limit
1949      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
1950      * useless crashes of the Redis instance for out of memory. */
1951     if (server.arch_bits == 32 && server.maxmemory == 0) {
1952         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
1953         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
1954         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
1955     }
1956 
1957     if (server.cluster_enabled) clusterInit();
1958     replicationScriptCacheInit();
1959     scriptingInit(1);
1960     slowlogInit();
1961     latencyMonitorInit();
1962     bioInit();
1963 }
1964 
1965 /* Populates the Redis Command Table starting from the hard coded list
1966  * we have on top of redis.c file. */
1967 void populateCommandTable(void) {
1968     int j;
1969     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
1970 
1971     for (j = 0; j < numcommands; j++) {
1972         struct redisCommand *c = redisCommandTable+j;
1973         char *f = c->sflags;
1974         int retval1, retval2;
1975 
1976         while(*f != '\0') {
1977             switch(*f) {
1978             case 'w': c->flags |= CMD_WRITE; break;
1979             case 'r': c->flags |= CMD_READONLY; break;
1980             case 'm': c->flags |= CMD_DENYOOM; break;
1981             case 'a': c->flags |= CMD_ADMIN; break;
1982             case 'p': c->flags |= CMD_PUBSUB; break;
1983             case 's': c->flags |= CMD_NOSCRIPT; break;
1984             case 'R': c->flags |= CMD_RANDOM; break;
1985             case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
1986             case 'l': c->flags |= CMD_LOADING; break;
1987             case 't': c->flags |= CMD_STALE; break;
1988             case 'M': c->flags |= CMD_SKIP_MONITOR; break;
1989             case 'k': c->flags |= CMD_ASKING; break;
1990             case 'F': c->flags |= CMD_FAST; break;
1991             default: serverPanic("Unsupported command flag"); break;
1992             }
1993             f++;
1994         }
1995 
1996         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
1997         /* Populate an additional dictionary that will be unaffected
1998          * by rename-command statements in redis.conf. */
1999         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2000         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2001     }
2002 }
2003 
2004 void resetCommandTableStats(void) {
2005     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2006     int j;
2007 
2008     for (j = 0; j < numcommands; j++) {
2009         struct redisCommand *c = redisCommandTable+j;
2010 
2011         c->microseconds = 0;
2012         c->calls = 0;
2013     }
2014 }
2015 
2016 /* ========================== Redis OP Array API ============================ */
2017 
2018 void redisOpArrayInit(redisOpArray *oa) {
2019     oa->ops = NULL;
2020     oa->numops = 0;
2021 }
2022 
2023 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2024                        robj **argv, int argc, int target)
2025 {
2026     redisOp *op;
2027 
2028     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2029     op = oa->ops+oa->numops;
2030     op->cmd = cmd;
2031     op->dbid = dbid;
2032     op->argv = argv;
2033     op->argc = argc;
2034     op->target = target;
2035     oa->numops++;
2036     return oa->numops;
2037 }
2038 
2039 void redisOpArrayFree(redisOpArray *oa) {
2040     while(oa->numops) {
2041         int j;
2042         redisOp *op;
2043 
2044         oa->numops--;
2045         op = oa->ops+oa->numops;
2046         for (j = 0; j < op->argc; j++)
2047             decrRefCount(op->argv[j]);
2048         zfree(op->argv);
2049     }
2050     zfree(oa->ops);
2051 }
2052 
2053 /* ====================== Commands lookup and execution ===================== */
2054 
2055 struct redisCommand *lookupCommand(sds name) {
2056     return dictFetchValue(server.commands, name);
2057 }
2058 
2059 struct redisCommand *lookupCommandByCString(char *s) {
2060     struct redisCommand *cmd;
2061     sds name = sdsnew(s);
2062 
2063     cmd = dictFetchValue(server.commands, name);
2064     sdsfree(name);
2065     return cmd;
2066 }
2067 
2068 /* Lookup the command in the current table, if not found also check in
2069  * the original table containing the original command names unaffected by
2070  * redis.conf rename-command statement.
2071  *
2072  * This is used by functions rewriting the argument vector such as
2073  * rewriteClientCommandVector() in order to set client->cmd pointer
2074  * correctly even if the command was renamed. */
2075 struct redisCommand *lookupCommandOrOriginal(sds name) {
2076     struct redisCommand *cmd = dictFetchValue(server.commands, name);
2077 
2078     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2079     return cmd;
2080 }
2081 
2082 /* Propagate the specified command (in the context of the specified database id)
2083  * to AOF and Slaves.
2084  *
2085  * flags are an xor between:
2086  * + PROPAGATE_NONE (no propagation of command at all)
2087  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2088  * + PROPAGATE_REPL (propagate into the replication link)
2089  *
2090  * This should not be used inside commands implementation. Use instead
2091  * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2092  */
2093 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2094                int flags)
2095 {
2096     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2097         feedAppendOnlyFile(cmd,dbid,argv,argc);
2098     if (flags & PROPAGATE_REPL)
2099         replicationFeedSlaves(server.slaves,dbid,argv,argc);
2100 }
2101 
2102 /* Used inside commands to schedule the propagation of additional commands
2103  * after the current command is propagated to AOF / Replication.
2104  *
2105  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2106  * database ID the command should be propagated into.
2107  * Arguments of the command to propagte are passed as an array of redis
2108  * objects pointers of len 'argc', using the 'argv' vector.
2109  *
2110  * The function does not take a reference to the passed 'argv' vector,
2111  * so it is up to the caller to release the passed argv (but it is usually
2112  * stack allocated).  The function autoamtically increments ref count of
2113  * passed objects, so the caller does not need to. */
2114 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2115                    int target)
2116 {
2117     robj **argvcopy;
2118     int j;
2119 
2120     if (server.loading) return; /* No propagation during loading. */
2121 
2122     argvcopy = zmalloc(sizeof(robj*)*argc);
2123     for (j = 0; j < argc; j++) {
2124         argvcopy[j] = argv[j];
2125         incrRefCount(argv[j]);
2126     }
2127     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2128 }
2129 
2130 /* It is possible to call the function forceCommandPropagation() inside a
2131  * Redis command implementation in order to to force the propagation of a
2132  * specific command execution into AOF / Replication. */
2133 void forceCommandPropagation(client *c, int flags) {
2134     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2135     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2136 }
2137 
2138 /* Avoid that the executed command is propagated at all. This way we
2139  * are free to just propagate what we want using the alsoPropagate()
2140  * API. */
2141 void preventCommandPropagation(client *c) {
2142     c->flags |= CLIENT_PREVENT_PROP;
2143 }
2144 
2145 /* AOF specific version of preventCommandPropagation(). */
2146 void preventCommandAOF(client *c) {
2147     c->flags |= CLIENT_PREVENT_AOF_PROP;
2148 }
2149 
2150 /* Replication specific version of preventCommandPropagation(). */
2151 void preventCommandReplication(client *c) {
2152     c->flags |= CLIENT_PREVENT_REPL_PROP;
2153 }
2154 
2155 /* Call() is the core of Redis execution of a command.
2156  *
2157  * The following flags can be passed:
2158  * CMD_CALL_NONE        No flags.
2159  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
2160  * CMD_CALL_STATS       Populate command stats.
2161  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
2162  *                          or if the client flags are forcing propagation.
2163  * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
2164  *                          or if the client flags are forcing propagation.
2165  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2166  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
2167  *
2168  * The exact propagation behavior depends on the client flags.
2169  * Specifically:
2170  *
2171  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2172  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2173  *    in the call flags, then the command is propagated even if the
2174  *    dataset was not affected by the command.
2175  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2176  *    are set, the propagation into AOF or to slaves is not performed even
2177  *    if the command modified the dataset.
2178  *
2179  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2180  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2181  * slaves propagation will never occur.
2182  *
2183  * Client flags are modified by the implementation of a given command
2184  * using the following API:
2185  *
2186  * forceCommandPropagation(client *c, int flags);
2187  * preventCommandPropagation(client *c);
2188  * preventCommandAOF(client *c);
2189  * preventCommandReplication(client *c);
2190  *
2191  */
2192 void call(client *c, int flags) {
2193     long long dirty, start, duration;
2194     int client_old_flags = c->flags;
2195 
2196     /* Sent the command to clients in MONITOR mode, only if the commands are
2197      * not generated from reading an AOF. */
2198     if (listLength(server.monitors) &&
2199         !server.loading &&
2200         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2201     {
2202         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2203     }
2204 
2205     /* Initialization: clear the flags that must be set by the command on
2206      * demand, and initialize the array for additional commands propagation. */
2207     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2208     redisOpArrayInit(&server.also_propagate);
2209 
2210     /* Call the command. */
2211     dirty = server.dirty;
2212     start = ustime();
2213     c->cmd->proc(c);
2214     duration = ustime()-start;
2215     dirty = server.dirty-dirty;
2216     if (dirty < 0) dirty = 0;
2217 
2218     /* When EVAL is called loading the AOF we don't want commands called
2219      * from Lua to go into the slowlog or to populate statistics. */
2220     if (server.loading && c->flags & CLIENT_LUA)
2221         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2222 
2223     /* If the caller is Lua, we want to force the EVAL caller to propagate
2224      * the script if the command flag or client flag are forcing the
2225      * propagation. */
2226     if (c->flags & CLIENT_LUA && server.lua_caller) {
2227         if (c->flags & CLIENT_FORCE_REPL)
2228             server.lua_caller->flags |= CLIENT_FORCE_REPL;
2229         if (c->flags & CLIENT_FORCE_AOF)
2230             server.lua_caller->flags |= CLIENT_FORCE_AOF;
2231     }
2232 
2233     /* Log the command into the Slow log if needed, and populate the
2234      * per-command statistics that we show in INFO commandstats. */
2235     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2236         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2237                               "fast-command" : "command";
2238         latencyAddSampleIfNeeded(latency_event,duration/1000);
2239         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
2240     }
2241     if (flags & CMD_CALL_STATS) {
2242         c->cmd->microseconds += duration;
2243         c->cmd->calls++;
2244     }
2245 
2246     /* Propagate the command into the AOF and replication link */
2247     if (flags & CMD_CALL_PROPAGATE &&
2248         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2249     {
2250         int propagate_flags = PROPAGATE_NONE;
2251 
2252         /* Check if the command operated changes in the data set. If so
2253          * set for replication / AOF propagation. */
2254         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2255 
2256         /* If the client forced AOF / replication of the command, set
2257          * the flags regardless of the command effects on the data set. */
2258         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2259         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2260 
2261         /* However prevent AOF / replication propagation if the command
2262          * implementatino called preventCommandPropagation() or similar,
2263          * or if we don't have the call() flags to do so. */
2264         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2265             !(flags & CMD_CALL_PROPAGATE_REPL))
2266                 propagate_flags &= ~PROPAGATE_REPL;
2267         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2268             !(flags & CMD_CALL_PROPAGATE_AOF))
2269                 propagate_flags &= ~PROPAGATE_AOF;
2270 
2271         /* Call propagate() only if at least one of AOF / replication
2272          * propagation is needed. */
2273         if (propagate_flags != PROPAGATE_NONE)
2274             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2275     }
2276 
2277     /* Restore the old replication flags, since call() can be executed
2278      * recursively. */
2279     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2280     c->flags |= client_old_flags &
2281         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2282 
2283     /* Handle the alsoPropagate() API to handle commands that want to propagate
2284      * multiple separated commands. Note that alsoPropagate() is not affected
2285      * by CLIENT_PREVENT_PROP flag. */
2286     if (server.also_propagate.numops) {
2287         int j;
2288         redisOp *rop;
2289 
2290         if (flags & CMD_CALL_PROPAGATE) {
2291             for (j = 0; j < server.also_propagate.numops; j++) {
2292                 rop = &server.also_propagate.ops[j];
2293                 int target = rop->target;
2294                 /* Whatever the command wish is, we honor the call() flags. */
2295                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2296                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2297                 if (target)
2298                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2299             }
2300         }
2301         redisOpArrayFree(&server.also_propagate);
2302     }
2303     server.stat_numcommands++;
2304 }
2305 
2306 /* If this function gets called we already read a whole
2307  * command, arguments are in the client argv/argc fields.
2308  * processCommand() execute the command or prepare the
2309  * server for a bulk read from the client.
2310  *
2311  * If 1 is returned the client is still alive and valid and
2312  * other operations can be performed by the caller. Otherwise
2313  * if 0 is returned the client was destroyed (i.e. after QUIT). */
2314 int processCommand(client *c) {
2315     /* The QUIT command is handled separately. Normal command procs will
2316      * go through checking for replication and QUIT will cause trouble
2317      * when FORCE_REPLICATION is enabled and would be implemented in
2318      * a regular command proc. */
2319     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2320         addReply(c,shared.ok);
2321         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2322         return C_ERR;
2323     }
2324 
2325     /* Now lookup the command and check ASAP about trivial error conditions
2326      * such as wrong arity, bad command name and so forth. */
2327     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2328     if (!c->cmd) {
2329         flagTransaction(c);
2330         addReplyErrorFormat(c,"unknown command '%s'",
2331             (char*)c->argv[0]->ptr);
2332         return C_OK;
2333     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2334                (c->argc < -c->cmd->arity)) {
2335         flagTransaction(c);
2336         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2337             c->cmd->name);
2338         return C_OK;
2339     }
2340 
2341     /* Check if the user is authenticated */
2342     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2343     {
2344         flagTransaction(c);
2345         addReply(c,shared.noautherr);
2346         return C_OK;
2347     }
2348 
2349     /* If cluster is enabled perform the cluster redirection here.
2350      * However we don't perform the redirection if:
2351      * 1) The sender of this command is our master.
2352      * 2) The command has no key arguments. */
2353     if (server.cluster_enabled &&
2354         !(c->flags & CLIENT_MASTER) &&
2355         !(c->flags & CLIENT_LUA &&
2356           server.lua_caller->flags & CLIENT_MASTER) &&
2357         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
2358     {
2359         int hashslot;
2360 
2361         if (server.cluster->state != CLUSTER_OK) {
2362             flagTransaction(c);
2363             clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
2364             return C_OK;
2365         } else {
2366             int error_code;
2367             clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
2368             if (n == NULL || n != server.cluster->myself) {
2369                 flagTransaction(c);
2370                 clusterRedirectClient(c,n,hashslot,error_code);
2371                 return C_OK;
2372             }
2373         }
2374     }
2375 
2376     /* Handle the maxmemory directive.
2377      *
2378      * First we try to free some memory if possible (if there are volatile
2379      * keys in the dataset). If there are not the only thing we can do
2380      * is returning an error. */
2381     if (server.maxmemory) {
2382         int retval = freeMemoryIfNeeded();
2383         if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
2384             flagTransaction(c);
2385             addReply(c, shared.oomerr);
2386             return C_OK;
2387         }
2388     }
2389 
2390     /* Don't accept write commands if there are problems persisting on disk
2391      * and if this is a master instance. */
2392     if (((server.stop_writes_on_bgsave_err &&
2393           server.saveparamslen > 0 &&
2394           server.lastbgsave_status == C_ERR) ||
2395           server.aof_last_write_status == C_ERR) &&
2396         server.masterhost == NULL &&
2397         (c->cmd->flags & CMD_WRITE ||
2398          c->cmd->proc == pingCommand))
2399     {
2400         flagTransaction(c);
2401         if (server.aof_last_write_status == C_OK)
2402             addReply(c, shared.bgsaveerr);
2403         else
2404             addReplySds(c,
2405                 sdscatprintf(sdsempty(),
2406                 "-MISCONF Errors writing to the AOF file: %s\r\n",
2407                 strerror(server.aof_last_write_errno)));
2408         return C_OK;
2409     }
2410 
2411     /* Don't accept write commands if there are not enough good slaves and
2412      * user configured the min-slaves-to-write option. */
2413     if (server.masterhost == NULL &&
2414         server.repl_min_slaves_to_write &&
2415         server.repl_min_slaves_max_lag &&
2416         c->cmd->flags & CMD_WRITE &&
2417         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2418     {
2419         flagTransaction(c);
2420         addReply(c, shared.noreplicaserr);
2421         return C_OK;
2422     }
2423 
2424     /* Don't accept write commands if this is a read only slave. But
2425      * accept write commands if this is our master. */
2426     if (server.masterhost && server.repl_slave_ro &&
2427         !(c->flags & CLIENT_MASTER) &&
2428         c->cmd->flags & CMD_WRITE)
2429     {
2430         addReply(c, shared.roslaveerr);
2431         return C_OK;
2432     }
2433 
2434     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2435     if (c->flags & CLIENT_PUBSUB &&
2436         c->cmd->proc != pingCommand &&
2437         c->cmd->proc != subscribeCommand &&
2438         c->cmd->proc != unsubscribeCommand &&
2439         c->cmd->proc != psubscribeCommand &&
2440         c->cmd->proc != punsubscribeCommand) {
2441         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2442         return C_OK;
2443     }
2444 
2445     /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
2446      * we are a slave with a broken link with master. */
2447     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2448         server.repl_serve_stale_data == 0 &&
2449         !(c->cmd->flags & CMD_STALE))
2450     {
2451         flagTransaction(c);
2452         addReply(c, shared.masterdownerr);
2453         return C_OK;
2454     }
2455 
2456     /* Loading DB? Return an error if the command has not the
2457      * CMD_LOADING flag. */
2458     if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2459         addReply(c, shared.loadingerr);
2460         return C_OK;
2461     }
2462 
2463     /* Lua script too slow? Only allow a limited number of commands. */
2464     if (server.lua_timedout &&
2465           c->cmd->proc != authCommand &&
2466           c->cmd->proc != replconfCommand &&
2467         !(c->cmd->proc == shutdownCommand &&
2468           c->argc == 2 &&
2469           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2470         !(c->cmd->proc == scriptCommand &&
2471           c->argc == 2 &&
2472           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2473     {
2474         flagTransaction(c);
2475         addReply(c, shared.slowscripterr);
2476         return C_OK;
2477     }
2478 
2479     /* Exec the command */
2480     if (c->flags & CLIENT_MULTI &&
2481         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2482         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2483     {
2484         queueMultiCommand(c);
2485         addReply(c,shared.queued);
2486     } else {
2487         call(c,CMD_CALL_FULL);
2488         c->woff = server.master_repl_offset;
2489         if (listLength(server.ready_keys))
2490             handleClientsBlockedOnLists();
2491     }
2492     return C_OK;
2493 }
2494 
2495 /*================================== Shutdown =============================== */
2496 
2497 /* Close listening sockets. Also unlink the unix domain socket if
2498  * unlink_unix_socket is non-zero. */
2499 void closeListeningSockets(int unlink_unix_socket) {
2500     int j;
2501 
2502     for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2503     if (server.sofd != -1) close(server.sofd);
2504     if (server.cluster_enabled)
2505         for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2506     if (unlink_unix_socket && server.unixsocket) {
2507         serverLog(LL_NOTICE,"Removing the unix socket file.");
2508         unlink(server.unixsocket); /* don't care if this fails */
2509     }
2510 }
2511 
2512 int prepareForShutdown(int flags) {
2513     int save = flags & SHUTDOWN_SAVE;
2514     int nosave = flags & SHUTDOWN_NOSAVE;
2515 
2516     serverLog(LL_WARNING,"User requested shutdown...");
2517 
2518     /* Kill all the Lua debugger forked sessions. */
2519     ldbKillForkedSessions();
2520 
2521     /* Kill the saving child if there is a background saving in progress.
2522        We want to avoid race conditions, for instance our saving child may
2523        overwrite the synchronous saving did by SHUTDOWN. */
2524     if (server.rdb_child_pid != -1) {
2525         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2526         kill(server.rdb_child_pid,SIGUSR1);
2527         rdbRemoveTempFile(server.rdb_child_pid);
2528     }
2529 
2530     if (server.aof_state != AOF_OFF) {
2531         /* Kill the AOF saving child as the AOF we already have may be longer
2532          * but contains the full dataset anyway. */
2533         if (server.aof_child_pid != -1) {
2534             /* If we have AOF enabled but haven't written the AOF yet, don't
2535              * shutdown or else the dataset will be lost. */
2536             if (server.aof_state == AOF_WAIT_REWRITE) {
2537                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2538                 return C_ERR;
2539             }
2540             serverLog(LL_WARNING,
2541                 "There is a child rewriting the AOF. Killing it!");
2542             kill(server.aof_child_pid,SIGUSR1);
2543         }
2544         /* Append only file: fsync() the AOF and exit */
2545         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2546         aof_fsync(server.aof_fd);
2547     }
2548 
2549     /* Create a new RDB file before exiting. */
2550     if ((server.saveparamslen > 0 && !nosave) || save) {
2551         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2552         /* Snapshotting. Perform a SYNC SAVE and exit */
2553         if (rdbSave(server.rdb_filename) != C_OK) {
2554             /* Ooops.. error saving! The best we can do is to continue
2555              * operating. Note that if there was a background saving process,
2556              * in the next cron() Redis will be notified that the background
2557              * saving aborted, handling special stuff like slaves pending for
2558              * synchronization... */
2559             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2560             return C_ERR;
2561         }
2562     }
2563 
2564     /* Remove the pid file if possible and needed. */
2565     if (server.daemonize || server.pidfile) {
2566         serverLog(LL_NOTICE,"Removing the pid file.");
2567         unlink(server.pidfile);
2568     }
2569 
2570     /* Best effort flush of slave output buffers, so that we hopefully
2571      * send them pending writes. */
2572     flushSlavesOutputBuffers();
2573 
2574     /* Close the listening sockets. Apparently this allows faster restarts. */
2575     closeListeningSockets(1);
2576     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2577         server.sentinel_mode ? "Sentinel" : "Redis");
2578     return C_OK;
2579 }
2580 
2581 /*================================== Commands =============================== */
2582 
2583 /* Return zero if strings are the same, non-zero if they are not.
2584  * The comparison is performed in a way that prevents an attacker to obtain
2585  * information about the nature of the strings just monitoring the execution
2586  * time of the function.
2587  *
2588  * Note that limiting the comparison length to strings up to 512 bytes we
2589  * can avoid leaking any information about the password length and any
2590  * possible branch misprediction related leak.
2591  */
2592 int time_independent_strcmp(char *a, char *b) {
2593     char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2594     /* The above two strlen perform len(a) + len(b) operations where either
2595      * a or b are fixed (our password) length, and the difference is only
2596      * relative to the length of the user provided string, so no information
2597      * leak is possible in the following two lines of code. */
2598     unsigned int alen = strlen(a);
2599     unsigned int blen = strlen(b);
2600     unsigned int j;
2601     int diff = 0;
2602 
2603     /* We can't compare strings longer than our static buffers.
2604      * Note that this will never pass the first test in practical circumstances
2605      * so there is no info leak. */
2606     if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2607 
2608     memset(bufa,0,sizeof(bufa));        /* Constant time. */
2609     memset(bufb,0,sizeof(bufb));        /* Constant time. */
2610     /* Again the time of the following two copies is proportional to
2611      * len(a) + len(b) so no info is leaked. */
2612     memcpy(bufa,a,alen);
2613     memcpy(bufb,b,blen);
2614 
2615     /* Always compare all the chars in the two buffers without
2616      * conditional expressions. */
2617     for (j = 0; j < sizeof(bufa); j++) {
2618         diff |= (bufa[j] ^ bufb[j]);
2619     }
2620     /* Length must be equal as well. */
2621     diff |= alen ^ blen;
2622     return diff; /* If zero strings are the same. */
2623 }
2624 
2625 void authCommand(client *c) {
2626     if (!server.requirepass) {
2627         addReplyError(c,"Client sent AUTH, but no password is set");
2628     } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2629       c->authenticated = 1;
2630       addReply(c,shared.ok);
2631     } else {
2632       c->authenticated = 0;
2633       addReplyError(c,"invalid password");
2634     }
2635 }
2636 
2637 /* The PING command. It works in a different way if the client is in
2638  * in Pub/Sub mode. */
2639 void pingCommand(client *c) {
2640     /* The command takes zero or one arguments. */
2641     if (c->argc > 2) {
2642         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2643             c->cmd->name);
2644         return;
2645     }
2646 
2647     if (c->flags & CLIENT_PUBSUB) {
2648         addReply(c,shared.mbulkhdr[2]);
2649         addReplyBulkCBuffer(c,"pong",4);
2650         if (c->argc == 1)
2651             addReplyBulkCBuffer(c,"",0);
2652         else
2653             addReplyBulk(c,c->argv[1]);
2654     } else {
2655         if (c->argc == 1)
2656             addReply(c,shared.pong);
2657         else
2658             addReplyBulk(c,c->argv[1]);
2659     }
2660 }
2661 
2662 void echoCommand(client *c) {
2663     addReplyBulk(c,c->argv[1]);
2664 }
2665 
2666 void timeCommand(client *c) {
2667     struct timeval tv;
2668 
2669     /* gettimeofday() can only fail if &tv is a bad address so we
2670      * don't check for errors. */
2671     gettimeofday(&tv,NULL);
2672     addReplyMultiBulkLen(c,2);
2673     addReplyBulkLongLong(c,tv.tv_sec);
2674     addReplyBulkLongLong(c,tv.tv_usec);
2675 }
2676 
2677 /* Helper function for addReplyCommand() to output flags. */
2678 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2679     if (cmd->flags & f) {
2680         addReplyStatus(c, reply);
2681         return 1;
2682     }
2683     return 0;
2684 }
2685 
2686 /* Output the representation of a Redis command. Used by the COMMAND command. */
2687 void addReplyCommand(client *c, struct redisCommand *cmd) {
2688     if (!cmd) {
2689         addReply(c, shared.nullbulk);
2690     } else {
2691         /* We are adding: command name, arg count, flags, first, last, offset */
2692         addReplyMultiBulkLen(c, 6);
2693         addReplyBulkCString(c, cmd->name);
2694         addReplyLongLong(c, cmd->arity);
2695 
2696         int flagcount = 0;
2697         void *flaglen = addDeferredMultiBulkLength(c);
2698         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
2699         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
2700         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
2701         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
2702         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
2703         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
2704         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
2705         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
2706         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
2707         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
2708         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
2709         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
2710         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
2711         if (cmd->getkeys_proc) {
2712             addReplyStatus(c, "movablekeys");
2713             flagcount += 1;
2714         }
2715         setDeferredMultiBulkLength(c, flaglen, flagcount);
2716 
2717         addReplyLongLong(c, cmd->firstkey);
2718         addReplyLongLong(c, cmd->lastkey);
2719         addReplyLongLong(c, cmd->keystep);
2720     }
2721 }
2722 
2723 /* COMMAND <subcommand> <args> */
2724 void commandCommand(client *c) {
2725     dictIterator *di;
2726     dictEntry *de;
2727 
2728     if (c->argc == 1) {
2729         addReplyMultiBulkLen(c, dictSize(server.commands));
2730         di = dictGetIterator(server.commands);
2731         while ((de = dictNext(di)) != NULL) {
2732             addReplyCommand(c, dictGetVal(de));
2733         }
2734         dictReleaseIterator(di);
2735     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
2736         int i;
2737         addReplyMultiBulkLen(c, c->argc-2);
2738         for (i = 2; i < c->argc; i++) {
2739             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
2740         }
2741     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
2742         addReplyLongLong(c, dictSize(server.commands));
2743     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
2744         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
2745         int *keys, numkeys, j;
2746 
2747         if (!cmd) {
2748             addReplyErrorFormat(c,"Invalid command specified");
2749             return;
2750         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
2751                    ((c->argc-2) < -cmd->arity))
2752         {
2753             addReplyError(c,"Invalid number of arguments specified for command");
2754             return;
2755         }
2756 
2757         keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
2758         addReplyMultiBulkLen(c,numkeys);
2759         for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
2760         getKeysFreeResult(keys);
2761     } else {
2762         addReplyError(c, "Unknown subcommand or wrong number of arguments.");
2763         return;
2764     }
2765 }
2766 
2767 /* Convert an amount of bytes into a human readable string in the form
2768  * of 100B, 2G, 100M, 4K, and so forth. */
2769 void bytesToHuman(char *s, unsigned long long n) {
2770     double d;
2771 
2772     if (n < 1024) {
2773         /* Bytes */
2774         sprintf(s,"%lluB",n);
2775         return;
2776     } else if (n < (1024*1024)) {
2777         d = (double)n/(1024);
2778         sprintf(s,"%.2fK",d);
2779     } else if (n < (1024LL*1024*1024)) {
2780         d = (double)n/(1024*1024);
2781         sprintf(s,"%.2fM",d);
2782     } else if (n < (1024LL*1024*1024*1024)) {
2783         d = (double)n/(1024LL*1024*1024);
2784         sprintf(s,"%.2fG",d);
2785     } else if (n < (1024LL*1024*1024*1024*1024)) {
2786         d = (double)n/(1024LL*1024*1024*1024);
2787         sprintf(s,"%.2fT",d);
2788     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
2789         d = (double)n/(1024LL*1024*1024*1024*1024);
2790         sprintf(s,"%.2fP",d);
2791     } else {
2792         /* Let's hope we never need this */
2793         sprintf(s,"%lluB",n);
2794     }
2795 }
2796 
2797 /* Create the string returned by the INFO command. This is decoupled
2798  * by the INFO command itself as we need to report the same information
2799  * on memory corruption problems. */
2800 sds genRedisInfoString(char *section) {
2801     sds info = sdsempty();
2802     time_t uptime = server.unixtime-server.stat_starttime;
2803     int j, numcommands;
2804     struct rusage self_ru, c_ru;
2805     unsigned long lol, bib;
2806     int allsections = 0, defsections = 0;
2807     int sections = 0;
2808 
2809     if (section == NULL) section = "default";
2810     allsections = strcasecmp(section,"all") == 0;
2811     defsections = strcasecmp(section,"default") == 0;
2812 
2813     getrusage(RUSAGE_SELF, &self_ru);
2814     getrusage(RUSAGE_CHILDREN, &c_ru);
2815     getClientsMaxBuffers(&lol,&bib);
2816 
2817     /* Server */
2818     if (allsections || defsections || !strcasecmp(section,"server")) {
2819         static int call_uname = 1;
2820         static struct utsname name;
2821         char *mode;
2822 
2823         if (server.cluster_enabled) mode = "cluster";
2824         else if (server.sentinel_mode) mode = "sentinel";
2825         else mode = "standalone";
2826 
2827         if (sections++) info = sdscat(info,"\r\n");
2828 
2829         if (call_uname) {
2830             /* Uname can be slow and is always the same output. Cache it. */
2831             uname(&name);
2832             call_uname = 0;
2833         }
2834 
2835         info = sdscatprintf(info,
2836             "# Server\r\n"
2837             "redis_version:%s\r\n"
2838             "redis_git_sha1:%s\r\n"
2839             "redis_git_dirty:%d\r\n"
2840             "redis_build_id:%llx\r\n"
2841             "redis_mode:%s\r\n"
2842             "os:%s %s %s\r\n"
2843             "arch_bits:%d\r\n"
2844             "multiplexing_api:%s\r\n"
2845             "gcc_version:%d.%d.%d\r\n"
2846             "process_id:%ld\r\n"
2847             "run_id:%s\r\n"
2848             "tcp_port:%d\r\n"
2849             "uptime_in_seconds:%jd\r\n"
2850             "uptime_in_days:%jd\r\n"
2851             "hz:%d\r\n"
2852             "lru_clock:%ld\r\n"
2853             "executable:%s\r\n"
2854             "config_file:%s\r\n",
2855             REDIS_VERSION,
2856             redisGitSHA1(),
2857             strtol(redisGitDirty(),NULL,10) > 0,
2858             (unsigned long long) redisBuildId(),
2859             mode,
2860             name.sysname, name.release, name.machine,
2861             server.arch_bits,
2862             aeGetApiName(),
2863 #ifdef __GNUC__
2864             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
2865 #else
2866             0,0,0,
2867 #endif
2868             (long) getpid(),
2869             server.runid,
2870             server.port,
2871             (intmax_t)uptime,
2872             (intmax_t)(uptime/(3600*24)),
2873             server.hz,
2874             (unsigned long) server.lruclock,
2875             server.executable ? server.executable : "",
2876             server.configfile ? server.configfile : "");
2877     }
2878 
2879     /* Clients */
2880     if (allsections || defsections || !strcasecmp(section,"clients")) {
2881         if (sections++) info = sdscat(info,"\r\n");
2882         info = sdscatprintf(info,
2883             "# Clients\r\n"
2884             "connected_clients:%lu\r\n"
2885             "client_longest_output_list:%lu\r\n"
2886             "client_biggest_input_buf:%lu\r\n"
2887             "blocked_clients:%d\r\n",
2888             listLength(server.clients)-listLength(server.slaves),
2889             lol, bib,
2890             server.bpop_blocked_clients);
2891     }
2892 
2893     /* Memory */
2894     if (allsections || defsections || !strcasecmp(section,"memory")) {
2895         char hmem[64];
2896         char peak_hmem[64];
2897         char total_system_hmem[64];
2898         char used_memory_lua_hmem[64];
2899         char used_memory_rss_hmem[64];
2900         char maxmemory_hmem[64];
2901         size_t zmalloc_used = zmalloc_used_memory();
2902         size_t total_system_mem = server.system_memory_size;
2903         const char *evict_policy = maxmemoryToString();
2904         long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
2905 
2906         /* Peak memory is updated from time to time by serverCron() so it
2907          * may happen that the instantaneous value is slightly bigger than
2908          * the peak value. This may confuse users, so we update the peak
2909          * if found smaller than the current memory usage. */
2910         if (zmalloc_used > server.stat_peak_memory)
2911             server.stat_peak_memory = zmalloc_used;
2912 
2913         bytesToHuman(hmem,zmalloc_used);
2914         bytesToHuman(peak_hmem,server.stat_peak_memory);
2915         bytesToHuman(total_system_hmem,total_system_mem);
2916         bytesToHuman(used_memory_lua_hmem,memory_lua);
2917         bytesToHuman(used_memory_rss_hmem,server.resident_set_size);
2918         bytesToHuman(maxmemory_hmem,server.maxmemory);
2919 
2920         if (sections++) info = sdscat(info,"\r\n");
2921         info = sdscatprintf(info,
2922             "# Memory\r\n"
2923             "used_memory:%zu\r\n"
2924             "used_memory_human:%s\r\n"
2925             "used_memory_rss:%zu\r\n"
2926             "used_memory_rss_human:%s\r\n"
2927             "used_memory_peak:%zu\r\n"
2928             "used_memory_peak_human:%s\r\n"
2929             "total_system_memory:%lu\r\n"
2930             "total_system_memory_human:%s\r\n"
2931             "used_memory_lua:%lld\r\n"
2932             "used_memory_lua_human:%s\r\n"
2933             "maxmemory:%lld\r\n"
2934             "maxmemory_human:%s\r\n"
2935             "maxmemory_policy:%s\r\n"
2936             "mem_fragmentation_ratio:%.2f\r\n"
2937             "mem_allocator:%s\r\n",
2938             zmalloc_used,
2939             hmem,
2940             server.resident_set_size,
2941             used_memory_rss_hmem,
2942             server.stat_peak_memory,
2943             peak_hmem,
2944             (unsigned long)total_system_mem,
2945             total_system_hmem,
2946             memory_lua,
2947             used_memory_lua_hmem,
2948             server.maxmemory,
2949             maxmemory_hmem,
2950             evict_policy,
2951             zmalloc_get_fragmentation_ratio(server.resident_set_size),
2952             ZMALLOC_LIB
2953             );
2954     }
2955 
2956     /* Persistence */
2957     if (allsections || defsections || !strcasecmp(section,"persistence")) {
2958         if (sections++) info = sdscat(info,"\r\n");
2959         info = sdscatprintf(info,
2960             "# Persistence\r\n"
2961             "loading:%d\r\n"
2962             "rdb_changes_since_last_save:%lld\r\n"
2963             "rdb_bgsave_in_progress:%d\r\n"
2964             "rdb_last_save_time:%jd\r\n"
2965             "rdb_last_bgsave_status:%s\r\n"
2966             "rdb_last_bgsave_time_sec:%jd\r\n"
2967             "rdb_current_bgsave_time_sec:%jd\r\n"
2968             "aof_enabled:%d\r\n"
2969             "aof_rewrite_in_progress:%d\r\n"
2970             "aof_rewrite_scheduled:%d\r\n"
2971             "aof_last_rewrite_time_sec:%jd\r\n"
2972             "aof_current_rewrite_time_sec:%jd\r\n"
2973             "aof_last_bgrewrite_status:%s\r\n"
2974             "aof_last_write_status:%s\r\n",
2975             server.loading,
2976             server.dirty,
2977             server.rdb_child_pid != -1,
2978             (intmax_t)server.lastsave,
2979             (server.lastbgsave_status == C_OK) ? "ok" : "err",
2980             (intmax_t)server.rdb_save_time_last,
2981             (intmax_t)((server.rdb_child_pid == -1) ?
2982                 -1 : time(NULL)-server.rdb_save_time_start),
2983             server.aof_state != AOF_OFF,
2984             server.aof_child_pid != -1,
2985             server.aof_rewrite_scheduled,
2986             (intmax_t)server.aof_rewrite_time_last,
2987             (intmax_t)((server.aof_child_pid == -1) ?
2988                 -1 : time(NULL)-server.aof_rewrite_time_start),
2989             (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
2990             (server.aof_last_write_status == C_OK) ? "ok" : "err");
2991 
2992         if (server.aof_state != AOF_OFF) {
2993             info = sdscatprintf(info,
2994                 "aof_current_size:%lld\r\n"
2995                 "aof_base_size:%lld\r\n"
2996                 "aof_pending_rewrite:%d\r\n"
2997                 "aof_buffer_length:%zu\r\n"
2998                 "aof_rewrite_buffer_length:%lu\r\n"
2999                 "aof_pending_bio_fsync:%llu\r\n"
3000                 "aof_delayed_fsync:%lu\r\n",
3001                 (long long) server.aof_current_size,
3002                 (long long) server.aof_rewrite_base_size,
3003                 server.aof_rewrite_scheduled,
3004                 sdslen(server.aof_buf),
3005                 aofRewriteBufferSize(),
3006                 bioPendingJobsOfType(BIO_AOF_FSYNC),
3007                 server.aof_delayed_fsync);
3008         }
3009 
3010         if (server.loading) {
3011             double perc;
3012             time_t eta, elapsed;
3013             off_t remaining_bytes = server.loading_total_bytes-
3014                                     server.loading_loaded_bytes;
3015 
3016             perc = ((double)server.loading_loaded_bytes /
3017                    (server.loading_total_bytes+1)) * 100;
3018 
3019             elapsed = time(NULL)-server.loading_start_time;
3020             if (elapsed == 0) {
3021                 eta = 1; /* A fake 1 second figure if we don't have
3022                             enough info */
3023             } else {
3024                 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3025             }
3026 
3027             info = sdscatprintf(info,
3028                 "loading_start_time:%jd\r\n"
3029                 "loading_total_bytes:%llu\r\n"
3030                 "loading_loaded_bytes:%llu\r\n"
3031                 "loading_loaded_perc:%.2f\r\n"
3032                 "loading_eta_seconds:%jd\r\n",
3033                 (intmax_t) server.loading_start_time,
3034                 (unsigned long long) server.loading_total_bytes,
3035                 (unsigned long long) server.loading_loaded_bytes,
3036                 perc,
3037                 (intmax_t)eta
3038             );
3039         }
3040     }
3041 
3042     /* Stats */
3043     if (allsections || defsections || !strcasecmp(section,"stats")) {
3044         if (sections++) info = sdscat(info,"\r\n");
3045         info = sdscatprintf(info,
3046             "# Stats\r\n"
3047             "total_connections_received:%lld\r\n"
3048             "total_commands_processed:%lld\r\n"
3049             "instantaneous_ops_per_sec:%lld\r\n"
3050             "total_net_input_bytes:%lld\r\n"
3051             "total_net_output_bytes:%lld\r\n"
3052             "instantaneous_input_kbps:%.2f\r\n"
3053             "instantaneous_output_kbps:%.2f\r\n"
3054             "rejected_connections:%lld\r\n"
3055             "sync_full:%lld\r\n"
3056             "sync_partial_ok:%lld\r\n"
3057             "sync_partial_err:%lld\r\n"
3058             "expired_keys:%lld\r\n"
3059             "evicted_keys:%lld\r\n"
3060             "keyspace_hits:%lld\r\n"
3061             "keyspace_misses:%lld\r\n"
3062             "pubsub_channels:%ld\r\n"
3063             "pubsub_patterns:%lu\r\n"
3064             "latest_fork_usec:%lld\r\n"
3065             "migrate_cached_sockets:%ld\r\n",
3066             server.stat_numconnections,
3067             server.stat_numcommands,
3068             getInstantaneousMetric(STATS_METRIC_COMMAND),
3069             server.stat_net_input_bytes,
3070             server.stat_net_output_bytes,
3071             (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3072             (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3073             server.stat_rejected_conn,
3074             server.stat_sync_full,
3075             server.stat_sync_partial_ok,
3076             server.stat_sync_partial_err,
3077             server.stat_expiredkeys,
3078             server.stat_evictedkeys,
3079             server.stat_keyspace_hits,
3080             server.stat_keyspace_misses,
3081             dictSize(server.pubsub_channels),
3082             listLength(server.pubsub_patterns),
3083             server.stat_fork_time,
3084             dictSize(server.migrate_cached_sockets));
3085     }
3086 
3087     /* Replication */
3088     if (allsections || defsections || !strcasecmp(section,"replication")) {
3089         if (sections++) info = sdscat(info,"\r\n");
3090         info = sdscatprintf(info,
3091             "# Replication\r\n"
3092             "role:%s\r\n",
3093             server.masterhost == NULL ? "master" : "slave");
3094         if (server.masterhost) {
3095             long long slave_repl_offset = 1;
3096 
3097             if (server.master)
3098                 slave_repl_offset = server.master->reploff;
3099             else if (server.cached_master)
3100                 slave_repl_offset = server.cached_master->reploff;
3101 
3102             info = sdscatprintf(info,
3103                 "master_host:%s\r\n"
3104                 "master_port:%d\r\n"
3105                 "master_link_status:%s\r\n"
3106                 "master_last_io_seconds_ago:%d\r\n"
3107                 "master_sync_in_progress:%d\r\n"
3108                 "slave_repl_offset:%lld\r\n"
3109                 ,server.masterhost,
3110                 server.masterport,
3111                 (server.repl_state == REPL_STATE_CONNECTED) ?
3112                     "up" : "down",
3113                 server.master ?
3114                 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3115                 server.repl_state == REPL_STATE_TRANSFER,
3116                 slave_repl_offset
3117             );
3118 
3119             if (server.repl_state == REPL_STATE_TRANSFER) {
3120                 info = sdscatprintf(info,
3121                     "master_sync_left_bytes:%lld\r\n"
3122                     "master_sync_last_io_seconds_ago:%d\r\n"
3123                     , (long long)
3124                         (server.repl_transfer_size - server.repl_transfer_read),
3125                     (int)(server.unixtime-server.repl_transfer_lastio)
3126                 );
3127             }
3128 
3129             if (server.repl_state != REPL_STATE_CONNECTED) {
3130                 info = sdscatprintf(info,
3131                     "master_link_down_since_seconds:%jd\r\n",
3132                     (intmax_t)server.unixtime-server.repl_down_since);
3133             }
3134             info = sdscatprintf(info,
3135                 "slave_priority:%d\r\n"
3136                 "slave_read_only:%d\r\n",
3137                 server.slave_priority,
3138                 server.repl_slave_ro);
3139         }
3140 
3141         info = sdscatprintf(info,
3142             "connected_slaves:%lu\r\n",
3143             listLength(server.slaves));
3144 
3145         /* If min-slaves-to-write is active, write the number of slaves
3146          * currently considered 'good'. */
3147         if (server.repl_min_slaves_to_write &&
3148             server.repl_min_slaves_max_lag) {
3149             info = sdscatprintf(info,
3150                 "min_slaves_good_slaves:%d\r\n",
3151                 server.repl_good_slaves_count);
3152         }
3153 
3154         if (listLength(server.slaves)) {
3155             int slaveid = 0;
3156             listNode *ln;
3157             listIter li;
3158 
3159             listRewind(server.slaves,&li);
3160             while((ln = listNext(&li))) {
3161                 client *slave = listNodeValue(ln);
3162                 char *state = NULL;
3163                 char ip[NET_IP_STR_LEN];
3164                 int port;
3165                 long lag = 0;
3166 
3167                 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue;
3168                 switch(slave->replstate) {
3169                 case SLAVE_STATE_WAIT_BGSAVE_START:
3170                 case SLAVE_STATE_WAIT_BGSAVE_END:
3171                     state = "wait_bgsave";
3172                     break;
3173                 case SLAVE_STATE_SEND_BULK:
3174                     state = "send_bulk";
3175                     break;
3176                 case SLAVE_STATE_ONLINE:
3177                     state = "online";
3178                     break;
3179                 }
3180                 if (state == NULL) continue;
3181                 if (slave->replstate == SLAVE_STATE_ONLINE)
3182                     lag = time(NULL) - slave->repl_ack_time;
3183 
3184                 info = sdscatprintf(info,
3185                     "slave%d:ip=%s,port=%d,state=%s,"
3186                     "offset=%lld,lag=%ld\r\n",
3187                     slaveid,ip,slave->slave_listening_port,state,
3188                     slave->repl_ack_off, lag);
3189                 slaveid++;
3190             }
3191         }
3192         info = sdscatprintf(info,
3193             "master_repl_offset:%lld\r\n"
3194             "repl_backlog_active:%d\r\n"
3195             "repl_backlog_size:%lld\r\n"
3196             "repl_backlog_first_byte_offset:%lld\r\n"
3197             "repl_backlog_histlen:%lld\r\n",
3198             server.master_repl_offset,
3199             server.repl_backlog != NULL,
3200             server.repl_backlog_size,
3201             server.repl_backlog_off,
3202             server.repl_backlog_histlen);
3203     }
3204 
3205     /* CPU */
3206     if (allsections || defsections || !strcasecmp(section,"cpu")) {
3207         if (sections++) info = sdscat(info,"\r\n");
3208         info = sdscatprintf(info,
3209         "# CPU\r\n"
3210         "used_cpu_sys:%.2f\r\n"
3211         "used_cpu_user:%.2f\r\n"
3212         "used_cpu_sys_children:%.2f\r\n"
3213         "used_cpu_user_children:%.2f\r\n",
3214         (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
3215         (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
3216         (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
3217         (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000);
3218     }
3219 
3220     /* cmdtime */
3221     if (allsections || !strcasecmp(section,"commandstats")) {
3222         if (sections++) info = sdscat(info,"\r\n");
3223         info = sdscatprintf(info, "# Commandstats\r\n");
3224         numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
3225         for (j = 0; j < numcommands; j++) {
3226             struct redisCommand *c = redisCommandTable+j;
3227 
3228             if (!c->calls) continue;
3229             info = sdscatprintf(info,
3230                 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3231                 c->name, c->calls, c->microseconds,
3232                 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3233         }
3234     }
3235 
3236     /* Cluster */
3237     if (allsections || defsections || !strcasecmp(section,"cluster")) {
3238         if (sections++) info = sdscat(info,"\r\n");
3239         info = sdscatprintf(info,
3240         "# Cluster\r\n"
3241         "cluster_enabled:%d\r\n",
3242         server.cluster_enabled);
3243     }
3244 
3245     /* Key space */
3246     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3247         if (sections++) info = sdscat(info,"\r\n");
3248         info = sdscatprintf(info, "# Keyspace\r\n");
3249         for (j = 0; j < server.dbnum; j++) {
3250             long long keys, vkeys;
3251 
3252             keys = dictSize(server.db[j].dict);
3253             vkeys = dictSize(server.db[j].expires);
3254             if (keys || vkeys) {
3255                 info = sdscatprintf(info,
3256                     "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3257                     j, keys, vkeys, server.db[j].avg_ttl);
3258             }
3259         }
3260     }
3261     return info;
3262 }
3263 
3264 void infoCommand(client *c) {
3265     char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3266 
3267     if (c->argc > 2) {
3268         addReply(c,shared.syntaxerr);
3269         return;
3270     }
3271     addReplyBulkSds(c, genRedisInfoString(section));
3272 }
3273 
3274 void monitorCommand(client *c) {
3275     /* ignore MONITOR if already slave or in monitor mode */
3276     if (c->flags & CLIENT_SLAVE) return;
3277 
3278     c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3279     listAddNodeTail(server.monitors,c);
3280     addReply(c,shared.ok);
3281 }
3282 
3283 /* ============================ Maxmemory directive  ======================== */
3284 
3285 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config
3286  * file to limit the max memory used by the server, before processing a
3287  * command.
3288  *
3289  * The goal of the function is to free enough memory to keep Redis under the
3290  * configured memory limit.
3291  *
3292  * The function starts calculating how many bytes should be freed to keep
3293  * Redis under the limit, and enters a loop selecting the best keys to
3294  * evict accordingly to the configured policy.
3295  *
3296  * If all the bytes needed to return back under the limit were freed the
3297  * function returns C_OK, otherwise C_ERR is returned, and the caller
3298  * should block the execution of commands that will result in more memory
3299  * used by the server.
3300  *
3301  * ------------------------------------------------------------------------
3302  *
3303  * LRU approximation algorithm
3304  *
3305  * Redis uses an approximation of the LRU algorithm that runs in constant
3306  * memory. Every time there is a key to expire, we sample N keys (with
3307  * N very small, usually in around 5) to populate a pool of best keys to
3308  * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE).
3309  *
3310  * The N keys sampled are added in the pool of good keys to expire (the one
3311  * with an old access time) if they are better than one of the current keys
3312  * in the pool.
3313  *
3314  * After the pool is populated, the best key we have in the pool is expired.
3315  * However note that we don't remove keys from the pool when they are deleted
3316  * so the pool may contain keys that no longer exist.
3317  *
3318  * When we try to evict a key, and all the entries in the pool don't exist
3319  * we populate it again. This time we'll be sure that the pool has at least
3320  * one key that can be evicted, if there is at least one key that can be
3321  * evicted in the whole database. */
3322 
3323 /* Create a new eviction pool. */
3324 struct evictionPoolEntry *evictionPoolAlloc(void) {
3325     struct evictionPoolEntry *ep;
3326     int j;
3327 
3328     ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE);
3329     for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) {
3330         ep[j].idle = 0;
3331         ep[j].key = NULL;
3332     }
3333     return ep;
3334 }
3335 
3336 /* This is an helper function for freeMemoryIfNeeded(), it is used in order
3337  * to populate the evictionPool with a few entries every time we want to
3338  * expire a key. Keys with idle time smaller than one of the current
3339  * keys are added. Keys are always added if there are free entries.
3340  *
3341  * We insert keys on place in ascending order, so keys with the smaller
3342  * idle time are on the left, and keys with the higher idle time on the
3343  * right. */
3344 
3345 #define EVICTION_SAMPLES_ARRAY_SIZE 16
3346 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
3347     int j, k, count;
3348     dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
3349     dictEntry **samples;
3350 
3351     /* Try to use a static buffer: this function is a big hit...
3352      * Note: it was actually measured that this helps. */
3353     if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
3354         samples = _samples;
3355     } else {
3356         samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);
3357     }
3358 
3359     count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
3360     for (j = 0; j < count; j++) {
3361         unsigned long long idle;
3362         sds key;
3363         robj *o;
3364         dictEntry *de;
3365 
3366         de = samples[j];
3367         key = dictGetKey(de);
3368         /* If the dictionary we are sampling from is not the main
3369          * dictionary (but the expires one) we need to lookup the key
3370          * again in the key dictionary to obtain the value object. */
3371         if (sampledict != keydict) de = dictFind(keydict, key);
3372         o = dictGetVal(de);
3373         idle = estimateObjectIdleTime(o);
3374 
3375         /* Insert the element inside the pool.
3376          * First, find the first empty bucket or the first populated
3377          * bucket that has an idle time smaller than our idle time. */
3378         k = 0;
3379         while (k < MAXMEMORY_EVICTION_POOL_SIZE &&
3380                pool[k].key &&
3381                pool[k].idle < idle) k++;
3382         if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) {
3383             /* Can't insert if the element is < the worst element we have
3384              * and there are no empty buckets. */
3385             continue;
3386         } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) {
3387             /* Inserting into empty position. No setup needed before insert. */
3388         } else {
3389             /* Inserting in the middle. Now k points to the first element
3390              * greater than the element to insert.  */
3391             if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) {
3392                 /* Free space on the right? Insert at k shifting
3393                  * all the elements from k to end to the right. */
3394                 memmove(pool+k+1,pool+k,
3395                     sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3396             } else {
3397                 /* No free space on right? Insert at k-1 */
3398                 k--;
3399                 /* Shift all elements on the left of k (included) to the
3400                  * left, so we discard the element with smaller idle time. */
3401                 sdsfree(pool[0].key);
3402                 memmove(pool,pool+1,sizeof(pool[0])*k);
3403             }
3404         }
3405         pool[k].key = sdsdup(key);
3406         pool[k].idle = idle;
3407     }
3408     if (samples != _samples) zfree(samples);
3409 }
3410 
3411 int freeMemoryIfNeeded(void) {
3412     size_t mem_used, mem_tofree, mem_freed;
3413     int slaves = listLength(server.slaves);
3414     mstime_t latency, eviction_latency;
3415 
3416     /* Remove the size of slaves output buffers and AOF buffer from the
3417      * count of used memory. */
3418     mem_used = zmalloc_used_memory();
3419     if (slaves) {
3420         listIter li;
3421         listNode *ln;
3422 
3423         listRewind(server.slaves,&li);
3424         while((ln = listNext(&li))) {
3425             client *slave = listNodeValue(ln);
3426             unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
3427             if (obuf_bytes > mem_used)
3428                 mem_used = 0;
3429             else
3430                 mem_used -= obuf_bytes;
3431         }
3432     }
3433     if (server.aof_state != AOF_OFF) {
3434         mem_used -= sdslen(server.aof_buf);
3435         mem_used -= aofRewriteBufferSize();
3436     }
3437 
3438     /* Check if we are over the memory limit. */
3439     if (mem_used <= server.maxmemory) return C_OK;
3440 
3441     if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
3442         return C_ERR; /* We need to free memory, but policy forbids. */
3443 
3444     /* Compute how much memory we need to free. */
3445     mem_tofree = mem_used - server.maxmemory;
3446     mem_freed = 0;
3447     latencyStartMonitor(latency);
3448     while (mem_freed < mem_tofree) {
3449         int j, k, keys_freed = 0;
3450 
3451         for (j = 0; j < server.dbnum; j++) {
3452             long bestval = 0; /* just to prevent warning */
3453             sds bestkey = NULL;
3454             dictEntry *de;
3455             redisDb *db = server.db+j;
3456             dict *dict;
3457 
3458             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3459                 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
3460             {
3461                 dict = server.db[j].dict;
3462             } else {
3463                 dict = server.db[j].expires;
3464             }
3465             if (dictSize(dict) == 0) continue;
3466 
3467             /* volatile-random and allkeys-random policy */
3468             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
3469                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
3470             {
3471                 de = dictGetRandomKey(dict);
3472                 bestkey = dictGetKey(de);
3473             }
3474 
3475             /* volatile-lru and allkeys-lru policy */
3476             else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3477                 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
3478             {
3479                 struct evictionPoolEntry *pool = db->eviction_pool;
3480 
3481                 while(bestkey == NULL) {
3482                     evictionPoolPopulate(dict, db->dict, db->eviction_pool);
3483                     /* Go backward from best to worst element to evict. */
3484                     for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
3485                         if (pool[k].key == NULL) continue;
3486                         de = dictFind(dict,pool[k].key);
3487 
3488                         /* Remove the entry from the pool. */
3489                         sdsfree(pool[k].key);
3490                         /* Shift all elements on its right to left. */
3491                         memmove(pool+k,pool+k+1,
3492                             sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3493                         /* Clear the element on the right which is empty
3494                          * since we shifted one position to the left.  */
3495                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
3496                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
3497 
3498                         /* If the key exists, is our pick. Otherwise it is
3499                          * a ghost and we need to try the next element. */
3500                         if (de) {
3501                             bestkey = dictGetKey(de);
3502                             break;
3503                         } else {
3504                             /* Ghost... */
3505                             continue;
3506                         }
3507                     }
3508                 }
3509             }
3510 
3511             /* volatile-ttl */
3512             else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
3513                 for (k = 0; k < server.maxmemory_samples; k++) {
3514                     sds thiskey;
3515                     long thisval;
3516 
3517                     de = dictGetRandomKey(dict);
3518                     thiskey = dictGetKey(de);
3519                     thisval = (long) dictGetVal(de);
3520 
3521                     /* Expire sooner (minor expire unix timestamp) is better
3522                      * candidate for deletion */
3523                     if (bestkey == NULL || thisval < bestval) {
3524                         bestkey = thiskey;
3525                         bestval = thisval;
3526                     }
3527                 }
3528             }
3529 
3530             /* Finally remove the selected key. */
3531             if (bestkey) {
3532                 long long delta;
3533 
3534                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
3535                 propagateExpire(db,keyobj);
3536                 /* We compute the amount of memory freed by dbDelete() alone.
3537                  * It is possible that actually the memory needed to propagate
3538                  * the DEL in AOF and replication link is greater than the one
3539                  * we are freeing removing the key, but we can't account for
3540                  * that otherwise we would never exit the loop.
3541                  *
3542                  * AOF and Output buffer memory will be freed eventually so
3543                  * we only care about memory used by the key space. */
3544                 delta = (long long) zmalloc_used_memory();
3545                 latencyStartMonitor(eviction_latency);
3546                 dbDelete(db,keyobj);
3547                 latencyEndMonitor(eviction_latency);
3548                 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
3549                 latencyRemoveNestedEvent(latency,eviction_latency);
3550                 delta -= (long long) zmalloc_used_memory();
3551                 mem_freed += delta;
3552                 server.stat_evictedkeys++;
3553                 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
3554                     keyobj, db->id);
3555                 decrRefCount(keyobj);
3556                 keys_freed++;
3557 
3558                 /* When the memory to free starts to be big enough, we may
3559                  * start spending so much time here that is impossible to
3560                  * deliver data to the slaves fast enough, so we force the
3561                  * transmission here inside the loop. */
3562                 if (slaves) flushSlavesOutputBuffers();
3563             }
3564         }
3565         if (!keys_freed) {
3566             latencyEndMonitor(latency);
3567             latencyAddSampleIfNeeded("eviction-cycle",latency);
3568             return C_ERR; /* nothing to free... */
3569         }
3570     }
3571     latencyEndMonitor(latency);
3572     latencyAddSampleIfNeeded("eviction-cycle",latency);
3573     return C_OK;
3574 }
3575 
3576 /* =================================== Main! ================================ */
3577 
3578 #ifdef __linux__
3579 int linuxOvercommitMemoryValue(void) {
3580     FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3581     char buf[64];
3582 
3583     if (!fp) return -1;
3584     if (fgets(buf,64,fp) == NULL) {
3585         fclose(fp);
3586         return -1;
3587     }
3588     fclose(fp);
3589 
3590     return atoi(buf);
3591 }
3592 
3593 void linuxMemoryWarnings(void) {
3594     if (linuxOvercommitMemoryValue() == 0) {
3595         serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3596     }
3597     if (THPIsEnabled()) {
3598         serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3599     }
3600 }
3601 #endif /* __linux__ */
3602 
3603 void createPidFile(void) {
3604     /* If pidfile requested, but no pidfile defined, use
3605      * default pidfile path */
3606     if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3607 
3608     /* Try to write the pid file in a best-effort way. */
3609     FILE *fp = fopen(server.pidfile,"w");
3610     if (fp) {
3611         fprintf(fp,"%d\n",(int)getpid());
3612         fclose(fp);
3613     }
3614 }
3615 
3616 void daemonize(void) {
3617     int fd;
3618 
3619     if (fork() != 0) exit(0); /* parent exits */
3620     setsid(); /* create a new session */
3621 
3622     /* Every output goes to /dev/null. If Redis is daemonized but
3623      * the 'logfile' is set to 'stdout' in the configuration file
3624      * it will not log at all. */
3625     if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3626         dup2(fd, STDIN_FILENO);
3627         dup2(fd, STDOUT_FILENO);
3628         dup2(fd, STDERR_FILENO);
3629         if (fd > STDERR_FILENO) close(fd);
3630     }
3631 }
3632 
3633 void version(void) {
3634     printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3635         REDIS_VERSION,
3636         redisGitSHA1(),
3637         atoi(redisGitDirty()) > 0,
3638         ZMALLOC_LIB,
3639         sizeof(long) == 4 ? 32 : 64,
3640         (unsigned long long) redisBuildId());
3641     exit(0);
3642 }
3643 
3644 void usage(void) {
3645     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3646     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
3647     fprintf(stderr,"       ./redis-server -v or --version\n");
3648     fprintf(stderr,"       ./redis-server -h or --help\n");
3649     fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
3650     fprintf(stderr,"Examples:\n");
3651     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
3652     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
3653     fprintf(stderr,"       ./redis-server --port 7777\n");
3654     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
3655     fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3656     fprintf(stderr,"Sentinel mode:\n");
3657     fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
3658     exit(1);
3659 }
3660 
3661 void redisAsciiArt(void) {
3662 #include "asciilogo.h"
3663     char *buf = zmalloc(1024*16);
3664     char *mode;
3665 
3666     if (server.cluster_enabled) mode = "cluster";
3667     else if (server.sentinel_mode) mode = "sentinel";
3668     else mode = "standalone";
3669 
3670     if (server.syslog_enabled) {
3671         serverLog(LL_NOTICE,
3672             "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
3673             REDIS_VERSION,
3674             redisGitSHA1(),
3675             strtol(redisGitDirty(),NULL,10) > 0,
3676             (sizeof(long) == 8) ? "64" : "32",
3677             mode, server.port,
3678             (long) getpid()
3679         );
3680     } else {
3681         snprintf(buf,1024*16,ascii_logo,
3682             REDIS_VERSION,
3683             redisGitSHA1(),
3684             strtol(redisGitDirty(),NULL,10) > 0,
3685             (sizeof(long) == 8) ? "64" : "32",
3686             mode, server.port,
3687             (long) getpid()
3688         );
3689         serverLogRaw(LL_NOTICE|LL_RAW,buf);
3690     }
3691     zfree(buf);
3692 }
3693 
3694 static void sigShutdownHandler(int sig) {
3695     char *msg;
3696 
3697     switch (sig) {
3698     case SIGINT:
3699         msg = "Received SIGINT scheduling shutdown...";
3700         break;
3701     case SIGTERM:
3702         msg = "Received SIGTERM scheduling shutdown...";
3703         break;
3704     default:
3705         msg = "Received shutdown signal, scheduling shutdown...";
3706     };
3707 
3708     /* SIGINT is often delivered via Ctrl+C in an interactive session.
3709      * If we receive the signal the second time, we interpret this as
3710      * the user really wanting to quit ASAP without waiting to persist
3711      * on disk. */
3712     if (server.shutdown_asap && sig == SIGINT) {
3713         serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3714         rdbRemoveTempFile(getpid());
3715         exit(1); /* Exit with an error since this was not a clean shutdown. */
3716     } else if (server.loading) {
3717         exit(0);
3718     }
3719 
3720     serverLogFromHandler(LL_WARNING, msg);
3721     server.shutdown_asap = 1;
3722 }
3723 
3724 void setupSignalHandlers(void) {
3725     struct sigaction act;
3726 
3727     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3728      * Otherwise, sa_handler is used. */
3729     sigemptyset(&act.sa_mask);
3730     act.sa_flags = 0;
3731     act.sa_handler = sigShutdownHandler;
3732     sigaction(SIGTERM, &act, NULL);
3733     sigaction(SIGINT, &act, NULL);
3734 
3735 #ifdef HAVE_BACKTRACE
3736     sigemptyset(&act.sa_mask);
3737     act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3738     act.sa_sigaction = sigsegvHandler;
3739     sigaction(SIGSEGV, &act, NULL);
3740     sigaction(SIGBUS, &act, NULL);
3741     sigaction(SIGFPE, &act, NULL);
3742     sigaction(SIGILL, &act, NULL);
3743 #endif
3744     return;
3745 }
3746 
3747 void memtest(size_t megabytes, int passes);
3748 
3749 /* Returns 1 if there is --sentinel among the arguments or if
3750  * argv[0] is exactly "redis-sentinel". */
3751 int checkForSentinelMode(int argc, char **argv) {
3752     int j;
3753 
3754     if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3755     for (j = 1; j < argc; j++)
3756         if (!strcmp(argv[j],"--sentinel")) return 1;
3757     return 0;
3758 }
3759 
3760 /* Function called at startup to load RDB or AOF file in memory. */
3761 void loadDataFromDisk(void) {
3762     long long start = ustime();
3763     if (server.aof_state == AOF_ON) {
3764         if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3765             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3766     } else {
3767         if (rdbLoad(server.rdb_filename) == C_OK) {
3768             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3769                 (float)(ustime()-start)/1000000);
3770         } else if (errno != ENOENT) {
3771             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3772             exit(1);
3773         }
3774     }
3775 }
3776 
3777 void redisOutOfMemoryHandler(size_t allocation_size) {
3778     serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3779         allocation_size);
3780     serverPanic("Redis aborting for OUT OF MEMORY");
3781 }
3782 
3783 void redisSetProcTitle(char *title) {
3784 #ifdef USE_SETPROCTITLE
3785     char *server_mode = "";
3786     if (server.cluster_enabled) server_mode = " [cluster]";
3787     else if (server.sentinel_mode) server_mode = " [sentinel]";
3788 
3789     setproctitle("%s %s:%d%s",
3790         title,
3791         server.bindaddr_count ? server.bindaddr[0] : "*",
3792         server.port,
3793         server_mode);
3794 #else
3795     UNUSED(title);
3796 #endif
3797 }
3798 
3799 /*
3800  * Check whether systemd or upstart have been used to start redis.
3801  */
3802 
3803 int redisSupervisedUpstart(void) {
3804     const char *upstart_job = getenv("UPSTART_JOB");
3805 
3806     if (!upstart_job) {
3807         serverLog(LL_WARNING,
3808                 "upstart supervision requested, but UPSTART_JOB not found");
3809         return 0;
3810     }
3811 
3812     serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readyness");
3813     raise(SIGSTOP);
3814     unsetenv("UPSTART_JOB");
3815     return 1;
3816 }
3817 
3818 int redisSupervisedSystemd(void) {
3819     const char *notify_socket = getenv("NOTIFY_SOCKET");
3820     int fd = 1;
3821     struct sockaddr_un su;
3822     struct iovec iov;
3823     struct msghdr hdr;
3824     int sendto_flags = 0;
3825 
3826     if (!notify_socket) {
3827         serverLog(LL_WARNING,
3828                 "systemd supervision requested, but NOTIFY_SOCKET not found");
3829         return 0;
3830     }
3831 
3832     if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3833         return 0;
3834     }
3835 
3836     serverLog(LL_NOTICE, "supervised by systemd, will signal readyness");
3837     if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3838         serverLog(LL_WARNING,
3839                 "Can't connect to systemd socket %s", notify_socket);
3840         return 0;
3841     }
3842 
3843     memset(&su, 0, sizeof(su));
3844     su.sun_family = AF_UNIX;
3845     strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3846     su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3847 
3848     if (notify_socket[0] == '@')
3849         su.sun_path[0] = '\0';
3850 
3851     memset(&iov, 0, sizeof(iov));
3852     iov.iov_base = "READY=1";
3853     iov.iov_len = strlen("READY=1");
3854 
3855     memset(&hdr, 0, sizeof(hdr));
3856     hdr.msg_name = &su;
3857     hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
3858         strlen(notify_socket);
3859     hdr.msg_iov = &iov;
3860     hdr.msg_iovlen = 1;
3861 
3862     unsetenv("NOTIFY_SOCKET");
3863 #ifdef HAVE_MSG_NOSIGNAL
3864     sendto_flags |= MSG_NOSIGNAL;
3865 #endif
3866     if (sendmsg(fd, &hdr, sendto_flags) < 0) {
3867         serverLog(LL_WARNING, "Can't send notification to systemd");
3868         close(fd);
3869         return 0;
3870     }
3871     close(fd);
3872     return 1;
3873 }
3874 
3875 int redisIsSupervised(int mode) {
3876     if (mode == SUPERVISED_AUTODETECT) {
3877         const char *upstart_job = getenv("UPSTART_JOB");
3878         const char *notify_socket = getenv("NOTIFY_SOCKET");
3879 
3880         if (upstart_job) {
3881             redisSupervisedUpstart();
3882         } else if (notify_socket) {
3883             redisSupervisedSystemd();
3884         }
3885     } else if (mode == SUPERVISED_UPSTART) {
3886         return redisSupervisedUpstart();
3887     } else if (mode == SUPERVISED_SYSTEMD) {
3888         return redisSupervisedSystemd();
3889     }
3890 
3891     return 0;
3892 }
3893 
3894 
3895 int main(int argc, char **argv) {
3896     struct timeval tv;
3897     int j;
3898 
3899 #ifdef REDIS_TEST
3900     if (argc == 3 && !strcasecmp(argv[1], "test")) {
3901         if (!strcasecmp(argv[2], "ziplist")) {
3902             return ziplistTest(argc, argv);
3903         } else if (!strcasecmp(argv[2], "quicklist")) {
3904             quicklistTest(argc, argv);
3905         } else if (!strcasecmp(argv[2], "intset")) {
3906             return intsetTest(argc, argv);
3907         } else if (!strcasecmp(argv[2], "zipmap")) {
3908             return zipmapTest(argc, argv);
3909         } else if (!strcasecmp(argv[2], "sha1test")) {
3910             return sha1Test(argc, argv);
3911         } else if (!strcasecmp(argv[2], "util")) {
3912             return utilTest(argc, argv);
3913         } else if (!strcasecmp(argv[2], "sds")) {
3914             return sdsTest(argc, argv);
3915         } else if (!strcasecmp(argv[2], "endianconv")) {
3916             return endianconvTest(argc, argv);
3917         } else if (!strcasecmp(argv[2], "crc64")) {
3918             return crc64Test(argc, argv);
3919         }
3920 
3921         return -1; /* test not found */
3922     }
3923 #endif
3924 
3925     /* We need to initialize our libraries, and the server configuration. */
3926 #ifdef INIT_SETPROCTITLE_REPLACEMENT
3927     spt_init(argc, argv);
3928 #endif
3929     setlocale(LC_COLLATE,"");
3930     zmalloc_enable_thread_safeness();
3931     zmalloc_set_oom_handler(redisOutOfMemoryHandler);
3932     srand(time(NULL)^getpid());
3933     gettimeofday(&tv,NULL);
3934     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
3935     server.sentinel_mode = checkForSentinelMode(argc,argv);
3936     initServerConfig();
3937 
3938     /* Store the executable path and arguments in a safe place in order
3939      * to be able to restart the server later. */
3940     server.executable = getAbsolutePath(argv[0]);
3941     server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
3942     server.exec_argv[argc] = NULL;
3943     for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
3944 
3945     /* We need to init sentinel right now as parsing the configuration file
3946      * in sentinel mode will have the effect of populating the sentinel
3947      * data structures with master nodes to monitor. */
3948     if (server.sentinel_mode) {
3949         initSentinelConfig();
3950         initSentinel();
3951     }
3952 
3953     /* Check if we need to start in redis-check-rdb mode. We just execute
3954      * the program main. However the program is part of the Redis executable
3955      * so that we can easily execute an RDB check on loading errors. */
3956     if (strstr(argv[0],"redis-check-rdb") != NULL)
3957         exit(redis_check_rdb_main(argv,argc));
3958 
3959     if (argc >= 2) {
3960         j = 1; /* First option to parse in argv[] */
3961         sds options = sdsempty();
3962         char *configfile = NULL;
3963 
3964         /* Handle special options --help and --version */
3965         if (strcmp(argv[1], "-v") == 0 ||
3966             strcmp(argv[1], "--version") == 0) version();
3967         if (strcmp(argv[1], "--help") == 0 ||
3968             strcmp(argv[1], "-h") == 0) usage();
3969         if (strcmp(argv[1], "--test-memory") == 0) {
3970             if (argc == 3) {
3971                 memtest(atoi(argv[2]),50);
3972                 exit(0);
3973             } else {
3974                 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
3975                 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
3976                 exit(1);
3977             }
3978         }
3979 
3980         /* First argument is the config file name? */
3981         if (argv[j][0] != '-' || argv[j][1] != '-') {
3982             configfile = argv[j];
3983             server.configfile = getAbsolutePath(configfile);
3984             /* Replace the config file in server.exec_argv with
3985              * its absoulte path. */
3986             zfree(server.exec_argv[j]);
3987             server.exec_argv[j] = zstrdup(server.configfile);
3988             j++;
3989         }
3990 
3991         /* All the other options are parsed and conceptually appended to the
3992          * configuration file. For instance --port 6380 will generate the
3993          * string "port 6380\n" to be parsed after the actual file name
3994          * is parsed, if any. */
3995         while(j != argc) {
3996             if (argv[j][0] == '-' && argv[j][1] == '-') {
3997                 /* Option name */
3998                 if (!strcmp(argv[j], "--check-rdb")) {
3999                     /* Argument has no options, need to skip for parsing. */
4000                     j++;
4001                     continue;
4002                 }
4003                 if (sdslen(options)) options = sdscat(options,"\n");
4004                 options = sdscat(options,argv[j]+2);
4005                 options = sdscat(options," ");
4006             } else {
4007                 /* Option argument */
4008                 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4009                 options = sdscat(options," ");
4010             }
4011             j++;
4012         }
4013         if (server.sentinel_mode && configfile && *configfile == '-') {
4014             serverLog(LL_WARNING,
4015                 "Sentinel config from STDIN not allowed.");
4016             serverLog(LL_WARNING,
4017                 "Sentinel needs config file on disk to save state.  Exiting...");
4018             exit(1);
4019         }
4020         resetServerSaveParams();
4021         loadServerConfig(configfile,options);
4022         sdsfree(options);
4023     } else {
4024         serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4025     }
4026 
4027     server.supervised = redisIsSupervised(server.supervised_mode);
4028     int background = server.daemonize && !server.supervised;
4029     if (background) daemonize();
4030 
4031     initServer();
4032     if (background || server.pidfile) createPidFile();
4033     redisSetProcTitle(argv[0]);
4034     redisAsciiArt();
4035     checkTcpBacklogSettings();
4036 
4037     if (!server.sentinel_mode) {
4038         /* Things not needed when running in Sentinel mode. */
4039         serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
4040     #ifdef __linux__
4041         linuxMemoryWarnings();
4042     #endif
4043         loadDataFromDisk();
4044         if (server.cluster_enabled) {
4045             if (verifyClusterConfigWithData() == C_ERR) {
4046                 serverLog(LL_WARNING,
4047                     "You can't have keys in a DB different than DB 0 when in "
4048                     "Cluster mode. Exiting.");
4049                 exit(1);
4050             }
4051         }
4052         if (server.ipfd_count > 0)
4053             serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4054         if (server.sofd > 0)
4055             serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4056     } else {
4057         sentinelIsRunning();
4058     }
4059 
4060     /* Warning the user about suspicious maxmemory setting. */
4061     if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4062         serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4063     }
4064 
4065     aeSetBeforeSleepProc(server.el,beforeSleep);
4066     aeMain(server.el);
4067     aeDeleteEventLoop(server.el);
4068     return 0;
4069 }
4070 
4071 /* The End */
4072