xref: /redis-3.2.3/src/server.c (revision b4e5e2ec)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35 
36 #include <time.h>
37 #include <signal.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <assert.h>
41 #include <ctype.h>
42 #include <stdarg.h>
43 #include <arpa/inet.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #include <sys/uio.h>
49 #include <sys/un.h>
50 #include <limits.h>
51 #include <float.h>
52 #include <math.h>
53 #include <sys/resource.h>
54 #include <sys/utsname.h>
55 #include <locale.h>
56 #include <sys/socket.h>
57 
58 /* Our shared "common" objects */
59 
60 struct sharedObjectsStruct shared;
61 
62 /* Global vars that are actually used as constants. The following double
63  * values are used for double on-disk serialization, and are initialized
64  * at runtime to avoid strange compiler optimizations. */
65 
66 double R_Zero, R_PosInf, R_NegInf, R_Nan;
67 
68 /*================================= Globals ================================= */
69 
70 /* Global vars */
71 struct redisServer server; /* server global state */
72 
73 /* Our command table.
74  *
75  * Every entry is composed of the following fields:
76  *
77  * name: a string representing the command name.
78  * function: pointer to the C function implementing the command.
79  * arity: number of arguments, it is possible to use -N to say >= N
80  * sflags: command flags as string. See below for a table of flags.
81  * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
82  * get_keys_proc: an optional function to get key arguments from a command.
83  *                This is only used when the following three fields are not
84  *                enough to specify what arguments are keys.
85  * first_key_index: first argument that is a key
86  * last_key_index: last argument that is a key
87  * key_step: step to get all the keys from first to last argument. For instance
88  *           in MSET the step is two since arguments are key,val,key,val,...
89  * microseconds: microseconds of total execution time for this command.
90  * calls: total number of calls of this command.
91  *
92  * The flags, microseconds and calls fields are computed by Redis and should
93  * always be set to zero.
94  *
95  * Command flags are expressed using strings where every character represents
96  * a flag. Later the populateCommandTable() function will take care of
97  * populating the real 'flags' field using this characters.
98  *
99  * This is the meaning of the flags:
100  *
101  * w: write command (may modify the key space).
102  * r: read command  (will never modify the key space).
103  * m: may increase memory usage once called. Don't allow if out of memory.
104  * a: admin command, like SAVE or SHUTDOWN.
105  * p: Pub/Sub related command.
106  * f: force replication of this command, regardless of server.dirty.
107  * s: command not allowed in scripts.
108  * R: random command. Command is not deterministic, that is, the same command
109  *    with the same arguments, with the same key space, may have different
110  *    results. For instance SPOP and RANDOMKEY are two random commands.
111  * S: Sort command output array if called from script, so that the output
112  *    is deterministic.
113  * l: Allow command while loading the database.
114  * t: Allow command while a slave has stale data but is not allowed to
115  *    server this data. Normally no command is accepted in this condition
116  *    but just a few.
117  * M: Do not automatically propagate the command on MONITOR.
118  * k: Perform an implicit ASKING for this command, so the command will be
119  *    accepted in cluster mode if the slot is marked as 'importing'.
120  * F: Fast command: O(1) or O(log(N)) command that should never delay
121  *    its execution as long as the kernel scheduler is giving us time.
122  *    Note that commands that may trigger a DEL as a side effect (like SET)
123  *    are not fast commands.
124  */
125 struct redisCommand redisCommandTable[] = {
126     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
127     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
128     {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
129     {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
130     {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
131     {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
132     {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
133     {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
134     {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
135     {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
136     {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
137     {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
138     {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
139     {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
140     {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
141     {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
142     {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
143     {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
144     {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
145     {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
146     {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
147     {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
148     {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
149     {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
150     {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
151     {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
152     {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
153     {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
154     {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
155     {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
156     {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
157     {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
158     {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
159     {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
160     {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
161     {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
162     {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
163     {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
164     {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
165     {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
166     {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0},
167     {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
168     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
169     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
170     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
171     {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
172     {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
173     {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
174     {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
175     {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
176     {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
177     {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
178     {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
179     {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
180     {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
181     {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
182     {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
183     {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
184     {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
185     {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
186     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
187     {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
188     {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
189     {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
190     {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
191     {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
192     {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
193     {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
194     {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
195     {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
196     {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
197     {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
198     {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
199     {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
200     {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
201     {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
202     {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
203     {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
204     {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
205     {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
206     {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
207     {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
208     {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
209     {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
210     {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
211     {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
212     {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
213     {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
214     {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
215     {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
216     {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
217     {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
218     {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
219     {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
220     {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
221     {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
222     {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
223     {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
224     {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
225     {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
226     {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
227     {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
228     {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
229     {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
230     {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0},
231     {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0},
232     {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0},
233     {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0},
234     {"bgsave",bgsaveCommand,1,"a",0,NULL,0,0,0,0,0},
235     {"bgrewriteaof",bgrewriteaofCommand,1,"a",0,NULL,0,0,0,0,0},
236     {"shutdown",shutdownCommand,-1,"alt",0,NULL,0,0,0,0,0},
237     {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0},
238     {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
239     {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
240     {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
241     {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
242     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
243     {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
244     {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0},
245     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
246     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
247     {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
248     {"info",infoCommand,-1,"lt",0,NULL,0,0,0,0,0},
249     {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0},
250     {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
251     {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
252     {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
253     {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
254     {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
255     {"debug",debugCommand,-1,"as",0,NULL,0,0,0,0,0},
256     {"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0},
257     {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
258     {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
259     {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
260     {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
261     {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
262     {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
263     {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
264     {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
265     {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0},
266     {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
267     {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
268     {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},
269     {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
270     {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
271     {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
272     {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
273     {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
274     {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
275     {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
276     {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
277     {"slowlog",slowlogCommand,-2,"a",0,NULL,0,0,0,0,0},
278     {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0},
279     {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0},
280     {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
281     {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
282     {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
283     {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0},
284     {"command",commandCommand,0,"lt",0,NULL,0,0,0,0,0},
285     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
286     {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0},
287     {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0},
288     {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
289     {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
290     {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
291     {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0},
292     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
293     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
294     {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
295     {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
296     {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
297 };
298 
299 struct evictionPoolEntry *evictionPoolAlloc(void);
300 
301 /*============================ Utility functions ============================ */
302 
303 /* Low level logging. To use only for very big messages, otherwise
304  * serverLog() is to prefer. */
305 void serverLogRaw(int level, const char *msg) {
306     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
307     const char *c = ".-*#";
308     FILE *fp;
309     char buf[64];
310     int rawmode = (level & LL_RAW);
311     int log_to_stdout = server.logfile[0] == '\0';
312 
313     level &= 0xff; /* clear flags */
314     if (level < server.verbosity) return;
315 
316     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
317     if (!fp) return;
318 
319     if (rawmode) {
320         fprintf(fp,"%s",msg);
321     } else {
322         int off;
323         struct timeval tv;
324         int role_char;
325         pid_t pid = getpid();
326 
327         gettimeofday(&tv,NULL);
328         off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec));
329         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
330         if (server.sentinel_mode) {
331             role_char = 'X'; /* Sentinel. */
332         } else if (pid != server.pid) {
333             role_char = 'C'; /* RDB / AOF writing child. */
334         } else {
335             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
336         }
337         fprintf(fp,"%d:%c %s %c %s\n",
338             (int)getpid(),role_char, buf,c[level],msg);
339     }
340     fflush(fp);
341 
342     if (!log_to_stdout) fclose(fp);
343     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
344 }
345 
346 /* Like serverLogRaw() but with printf-alike support. This is the function that
347  * is used across the code. The raw version is only used in order to dump
348  * the INFO output on crash. */
349 void serverLog(int level, const char *fmt, ...) {
350     va_list ap;
351     char msg[LOG_MAX_LEN];
352 
353     if ((level&0xff) < server.verbosity) return;
354 
355     va_start(ap, fmt);
356     vsnprintf(msg, sizeof(msg), fmt, ap);
357     va_end(ap);
358 
359     serverLogRaw(level,msg);
360 }
361 
362 /* Log a fixed message without printf-alike capabilities, in a way that is
363  * safe to call from a signal handler.
364  *
365  * We actually use this only for signals that are not fatal from the point
366  * of view of Redis. Signals that are going to kill the server anyway and
367  * where we need printf-alike features are served by serverLog(). */
368 void serverLogFromHandler(int level, const char *msg) {
369     int fd;
370     int log_to_stdout = server.logfile[0] == '\0';
371     char buf[64];
372 
373     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
374         return;
375     fd = log_to_stdout ? STDOUT_FILENO :
376                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
377     if (fd == -1) return;
378     ll2string(buf,sizeof(buf),getpid());
379     if (write(fd,buf,strlen(buf)) == -1) goto err;
380     if (write(fd,":signal-handler (",17) == -1) goto err;
381     ll2string(buf,sizeof(buf),time(NULL));
382     if (write(fd,buf,strlen(buf)) == -1) goto err;
383     if (write(fd,") ",2) == -1) goto err;
384     if (write(fd,msg,strlen(msg)) == -1) goto err;
385     if (write(fd,"\n",1) == -1) goto err;
386 err:
387     if (!log_to_stdout) close(fd);
388 }
389 
390 /* Return the UNIX time in microseconds */
391 long long ustime(void) {
392     struct timeval tv;
393     long long ust;
394 
395     gettimeofday(&tv, NULL);
396     ust = ((long long)tv.tv_sec)*1000000;
397     ust += tv.tv_usec;
398     return ust;
399 }
400 
401 /* Return the UNIX time in milliseconds */
402 mstime_t mstime(void) {
403     return ustime()/1000;
404 }
405 
406 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
407  * exit(), because the latter may interact with the same file objects used by
408  * the parent process. However if we are testing the coverage normal exit() is
409  * used in order to obtain the right coverage information. */
410 void exitFromChild(int retcode) {
411 #ifdef COVERAGE_TEST
412     exit(retcode);
413 #else
414     _exit(retcode);
415 #endif
416 }
417 
418 /*====================== Hash table type implementation  ==================== */
419 
420 /* This is a hash table type that uses the SDS dynamic strings library as
421  * keys and redis objects as values (objects can hold SDS strings,
422  * lists, sets). */
423 
424 void dictVanillaFree(void *privdata, void *val)
425 {
426     DICT_NOTUSED(privdata);
427     zfree(val);
428 }
429 
430 void dictListDestructor(void *privdata, void *val)
431 {
432     DICT_NOTUSED(privdata);
433     listRelease((list*)val);
434 }
435 
436 int dictSdsKeyCompare(void *privdata, const void *key1,
437         const void *key2)
438 {
439     int l1,l2;
440     DICT_NOTUSED(privdata);
441 
442     l1 = sdslen((sds)key1);
443     l2 = sdslen((sds)key2);
444     if (l1 != l2) return 0;
445     return memcmp(key1, key2, l1) == 0;
446 }
447 
448 /* A case insensitive version used for the command lookup table and other
449  * places where case insensitive non binary-safe comparison is needed. */
450 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
451         const void *key2)
452 {
453     DICT_NOTUSED(privdata);
454 
455     return strcasecmp(key1, key2) == 0;
456 }
457 
458 void dictObjectDestructor(void *privdata, void *val)
459 {
460     DICT_NOTUSED(privdata);
461 
462     if (val == NULL) return; /* Values of swapped out keys as set to NULL */
463     decrRefCount(val);
464 }
465 
466 void dictSdsDestructor(void *privdata, void *val)
467 {
468     DICT_NOTUSED(privdata);
469 
470     sdsfree(val);
471 }
472 
473 int dictObjKeyCompare(void *privdata, const void *key1,
474         const void *key2)
475 {
476     const robj *o1 = key1, *o2 = key2;
477     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
478 }
479 
480 unsigned int dictObjHash(const void *key) {
481     const robj *o = key;
482     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
483 }
484 
485 unsigned int dictSdsHash(const void *key) {
486     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
487 }
488 
489 unsigned int dictSdsCaseHash(const void *key) {
490     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
491 }
492 
493 int dictEncObjKeyCompare(void *privdata, const void *key1,
494         const void *key2)
495 {
496     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
497     int cmp;
498 
499     if (o1->encoding == OBJ_ENCODING_INT &&
500         o2->encoding == OBJ_ENCODING_INT)
501             return o1->ptr == o2->ptr;
502 
503     o1 = getDecodedObject(o1);
504     o2 = getDecodedObject(o2);
505     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
506     decrRefCount(o1);
507     decrRefCount(o2);
508     return cmp;
509 }
510 
511 unsigned int dictEncObjHash(const void *key) {
512     robj *o = (robj*) key;
513 
514     if (sdsEncodedObject(o)) {
515         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
516     } else {
517         if (o->encoding == OBJ_ENCODING_INT) {
518             char buf[32];
519             int len;
520 
521             len = ll2string(buf,32,(long)o->ptr);
522             return dictGenHashFunction((unsigned char*)buf, len);
523         } else {
524             unsigned int hash;
525 
526             o = getDecodedObject(o);
527             hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
528             decrRefCount(o);
529             return hash;
530         }
531     }
532 }
533 
534 /* Sets type hash table */
535 dictType setDictType = {
536     dictEncObjHash,            /* hash function */
537     NULL,                      /* key dup */
538     NULL,                      /* val dup */
539     dictEncObjKeyCompare,      /* key compare */
540     dictObjectDestructor, /* key destructor */
541     NULL                       /* val destructor */
542 };
543 
544 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
545 dictType zsetDictType = {
546     dictEncObjHash,            /* hash function */
547     NULL,                      /* key dup */
548     NULL,                      /* val dup */
549     dictEncObjKeyCompare,      /* key compare */
550     dictObjectDestructor, /* key destructor */
551     NULL                       /* val destructor */
552 };
553 
554 /* Db->dict, keys are sds strings, vals are Redis objects. */
555 dictType dbDictType = {
556     dictSdsHash,                /* hash function */
557     NULL,                       /* key dup */
558     NULL,                       /* val dup */
559     dictSdsKeyCompare,          /* key compare */
560     dictSdsDestructor,          /* key destructor */
561     dictObjectDestructor   /* val destructor */
562 };
563 
564 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
565 dictType shaScriptObjectDictType = {
566     dictSdsCaseHash,            /* hash function */
567     NULL,                       /* key dup */
568     NULL,                       /* val dup */
569     dictSdsKeyCaseCompare,      /* key compare */
570     dictSdsDestructor,          /* key destructor */
571     dictObjectDestructor   /* val destructor */
572 };
573 
574 /* Db->expires */
575 dictType keyptrDictType = {
576     dictSdsHash,               /* hash function */
577     NULL,                      /* key dup */
578     NULL,                      /* val dup */
579     dictSdsKeyCompare,         /* key compare */
580     NULL,                      /* key destructor */
581     NULL                       /* val destructor */
582 };
583 
584 /* Command table. sds string -> command struct pointer. */
585 dictType commandTableDictType = {
586     dictSdsCaseHash,           /* hash function */
587     NULL,                      /* key dup */
588     NULL,                      /* val dup */
589     dictSdsKeyCaseCompare,     /* key compare */
590     dictSdsDestructor,         /* key destructor */
591     NULL                       /* val destructor */
592 };
593 
594 /* Hash type hash table (note that small hashes are represented with ziplists) */
595 dictType hashDictType = {
596     dictEncObjHash,             /* hash function */
597     NULL,                       /* key dup */
598     NULL,                       /* val dup */
599     dictEncObjKeyCompare,       /* key compare */
600     dictObjectDestructor,  /* key destructor */
601     dictObjectDestructor   /* val destructor */
602 };
603 
604 /* Keylist hash table type has unencoded redis objects as keys and
605  * lists as values. It's used for blocking operations (BLPOP) and to
606  * map swapped keys to a list of clients waiting for this keys to be loaded. */
607 dictType keylistDictType = {
608     dictObjHash,                /* hash function */
609     NULL,                       /* key dup */
610     NULL,                       /* val dup */
611     dictObjKeyCompare,          /* key compare */
612     dictObjectDestructor,  /* key destructor */
613     dictListDestructor          /* val destructor */
614 };
615 
616 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
617  * clusterNode structures. */
618 dictType clusterNodesDictType = {
619     dictSdsHash,                /* hash function */
620     NULL,                       /* key dup */
621     NULL,                       /* val dup */
622     dictSdsKeyCompare,          /* key compare */
623     dictSdsDestructor,          /* key destructor */
624     NULL                        /* val destructor */
625 };
626 
627 /* Cluster re-addition blacklist. This maps node IDs to the time
628  * we can re-add this node. The goal is to avoid readding a removed
629  * node for some time. */
630 dictType clusterNodesBlackListDictType = {
631     dictSdsCaseHash,            /* hash function */
632     NULL,                       /* key dup */
633     NULL,                       /* val dup */
634     dictSdsKeyCaseCompare,      /* key compare */
635     dictSdsDestructor,          /* key destructor */
636     NULL                        /* val destructor */
637 };
638 
639 /* Migrate cache dict type. */
640 dictType migrateCacheDictType = {
641     dictSdsHash,                /* hash function */
642     NULL,                       /* key dup */
643     NULL,                       /* val dup */
644     dictSdsKeyCompare,          /* key compare */
645     dictSdsDestructor,          /* key destructor */
646     NULL                        /* val destructor */
647 };
648 
649 /* Replication cached script dict (server.repl_scriptcache_dict).
650  * Keys are sds SHA1 strings, while values are not used at all in the current
651  * implementation. */
652 dictType replScriptCacheDictType = {
653     dictSdsCaseHash,            /* hash function */
654     NULL,                       /* key dup */
655     NULL,                       /* val dup */
656     dictSdsKeyCaseCompare,      /* key compare */
657     dictSdsDestructor,          /* key destructor */
658     NULL                        /* val destructor */
659 };
660 
661 int htNeedsResize(dict *dict) {
662     long long size, used;
663 
664     size = dictSlots(dict);
665     used = dictSize(dict);
666     return (size > DICT_HT_INITIAL_SIZE &&
667             (used*100/size < HASHTABLE_MIN_FILL));
668 }
669 
670 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
671  * we resize the hash table to save memory */
672 void tryResizeHashTables(int dbid) {
673     if (htNeedsResize(server.db[dbid].dict))
674         dictResize(server.db[dbid].dict);
675     if (htNeedsResize(server.db[dbid].expires))
676         dictResize(server.db[dbid].expires);
677 }
678 
679 /* Our hash table implementation performs rehashing incrementally while
680  * we write/read from the hash table. Still if the server is idle, the hash
681  * table will use two tables for a long time. So we try to use 1 millisecond
682  * of CPU time at every call of this function to perform some rehahsing.
683  *
684  * The function returns 1 if some rehashing was performed, otherwise 0
685  * is returned. */
686 int incrementallyRehash(int dbid) {
687     /* Keys dictionary */
688     if (dictIsRehashing(server.db[dbid].dict)) {
689         dictRehashMilliseconds(server.db[dbid].dict,1);
690         return 1; /* already used our millisecond for this loop... */
691     }
692     /* Expires */
693     if (dictIsRehashing(server.db[dbid].expires)) {
694         dictRehashMilliseconds(server.db[dbid].expires,1);
695         return 1; /* already used our millisecond for this loop... */
696     }
697     return 0;
698 }
699 
700 /* This function is called once a background process of some kind terminates,
701  * as we want to avoid resizing the hash tables when there is a child in order
702  * to play well with copy-on-write (otherwise when a resize happens lots of
703  * memory pages are copied). The goal of this function is to update the ability
704  * for dict.c to resize the hash tables accordingly to the fact we have o not
705  * running childs. */
706 void updateDictResizePolicy(void) {
707     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
708         dictEnableResize();
709     else
710         dictDisableResize();
711 }
712 
713 /* ======================= Cron: called every 100 ms ======================== */
714 
715 /* Helper function for the activeExpireCycle() function.
716  * This function will try to expire the key that is stored in the hash table
717  * entry 'de' of the 'expires' hash table of a Redis database.
718  *
719  * If the key is found to be expired, it is removed from the database and
720  * 1 is returned. Otherwise no operation is performed and 0 is returned.
721  *
722  * When a key is expired, server.stat_expiredkeys is incremented.
723  *
724  * The parameter 'now' is the current time in milliseconds as is passed
725  * to the function to avoid too many gettimeofday() syscalls. */
726 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
727     long long t = dictGetSignedIntegerVal(de);
728     if (now > t) {
729         sds key = dictGetKey(de);
730         robj *keyobj = createStringObject(key,sdslen(key));
731 
732         propagateExpire(db,keyobj);
733         dbDelete(db,keyobj);
734         notifyKeyspaceEvent(NOTIFY_EXPIRED,
735             "expired",keyobj,db->id);
736         decrRefCount(keyobj);
737         server.stat_expiredkeys++;
738         return 1;
739     } else {
740         return 0;
741     }
742 }
743 
744 /* Try to expire a few timed out keys. The algorithm used is adaptive and
745  * will use few CPU cycles if there are few expiring keys, otherwise
746  * it will get more aggressive to avoid that too much memory is used by
747  * keys that can be removed from the keyspace.
748  *
749  * No more than CRON_DBS_PER_CALL databases are tested at every
750  * iteration.
751  *
752  * This kind of call is used when Redis detects that timelimit_exit is
753  * true, so there is more work to do, and we do it more incrementally from
754  * the beforeSleep() function of the event loop.
755  *
756  * Expire cycle type:
757  *
758  * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
759  * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
760  * microseconds, and is not repeated again before the same amount of time.
761  *
762  * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
763  * executed, where the time limit is a percentage of the REDIS_HZ period
764  * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */
765 
766 void activeExpireCycle(int type) {
767     /* This function has some global state in order to continue the work
768      * incrementally across calls. */
769     static unsigned int current_db = 0; /* Last DB tested. */
770     static int timelimit_exit = 0;      /* Time limit hit in previous call? */
771     static long long last_fast_cycle = 0; /* When last fast cycle ran. */
772 
773     int j, iteration = 0;
774     int dbs_per_call = CRON_DBS_PER_CALL;
775     long long start = ustime(), timelimit;
776 
777     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
778         /* Don't start a fast cycle if the previous cycle did not exited
779          * for time limt. Also don't repeat a fast cycle for the same period
780          * as the fast cycle total duration itself. */
781         if (!timelimit_exit) return;
782         if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
783         last_fast_cycle = start;
784     }
785 
786     /* We usually should test CRON_DBS_PER_CALL per iteration, with
787      * two exceptions:
788      *
789      * 1) Don't test more DBs than we have.
790      * 2) If last time we hit the time limit, we want to scan all DBs
791      * in this iteration, as there is work to do in some DB and we don't want
792      * expired keys to use memory for too much time. */
793     if (dbs_per_call > server.dbnum || timelimit_exit)
794         dbs_per_call = server.dbnum;
795 
796     /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
797      * per iteration. Since this function gets called with a frequency of
798      * server.hz times per second, the following is the max amount of
799      * microseconds we can spend in this function. */
800     timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
801     timelimit_exit = 0;
802     if (timelimit <= 0) timelimit = 1;
803 
804     if (type == ACTIVE_EXPIRE_CYCLE_FAST)
805         timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
806 
807     for (j = 0; j < dbs_per_call; j++) {
808         int expired;
809         redisDb *db = server.db+(current_db % server.dbnum);
810 
811         /* Increment the DB now so we are sure if we run out of time
812          * in the current DB we'll restart from the next. This allows to
813          * distribute the time evenly across DBs. */
814         current_db++;
815 
816         /* Continue to expire if at the end of the cycle more than 25%
817          * of the keys were expired. */
818         do {
819             unsigned long num, slots;
820             long long now, ttl_sum;
821             int ttl_samples;
822 
823             /* If there is nothing to expire try next DB ASAP. */
824             if ((num = dictSize(db->expires)) == 0) {
825                 db->avg_ttl = 0;
826                 break;
827             }
828             slots = dictSlots(db->expires);
829             now = mstime();
830 
831             /* When there are less than 1% filled slots getting random
832              * keys is expensive, so stop here waiting for better times...
833              * The dictionary will be resized asap. */
834             if (num && slots > DICT_HT_INITIAL_SIZE &&
835                 (num*100/slots < 1)) break;
836 
837             /* The main collection cycle. Sample random keys among keys
838              * with an expire set, checking for expired ones. */
839             expired = 0;
840             ttl_sum = 0;
841             ttl_samples = 0;
842 
843             if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
844                 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
845 
846             while (num--) {
847                 dictEntry *de;
848                 long long ttl;
849 
850                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
851                 ttl = dictGetSignedIntegerVal(de)-now;
852                 if (activeExpireCycleTryExpire(db,de,now)) expired++;
853                 if (ttl > 0) {
854                     /* We want the average TTL of keys yet not expired. */
855                     ttl_sum += ttl;
856                     ttl_samples++;
857                 }
858             }
859 
860             /* Update the average TTL stats for this database. */
861             if (ttl_samples) {
862                 long long avg_ttl = ttl_sum/ttl_samples;
863 
864                 /* Do a simple running average with a few samples.
865                  * We just use the current estimate with a weight of 2%
866                  * and the previous estimate with a weight of 98%. */
867                 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
868                 db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
869             }
870 
871             /* We can't block forever here even if there are many keys to
872              * expire. So after a given amount of milliseconds return to the
873              * caller waiting for the other active expire cycle. */
874             iteration++;
875             if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
876                 long long elapsed = ustime()-start;
877 
878                 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
879                 if (elapsed > timelimit) timelimit_exit = 1;
880             }
881             if (timelimit_exit) return;
882             /* We don't repeat the cycle if there are less than 25% of keys
883              * found expired in the current DB. */
884         } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
885     }
886 }
887 
888 unsigned int getLRUClock(void) {
889     return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
890 }
891 
892 /* Add a sample to the operations per second array of samples. */
893 void trackInstantaneousMetric(int metric, long long current_reading) {
894     long long t = mstime() - server.inst_metric[metric].last_sample_time;
895     long long ops = current_reading -
896                     server.inst_metric[metric].last_sample_count;
897     long long ops_sec;
898 
899     ops_sec = t > 0 ? (ops*1000/t) : 0;
900 
901     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
902         ops_sec;
903     server.inst_metric[metric].idx++;
904     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
905     server.inst_metric[metric].last_sample_time = mstime();
906     server.inst_metric[metric].last_sample_count = current_reading;
907 }
908 
909 /* Return the mean of all the samples. */
910 long long getInstantaneousMetric(int metric) {
911     int j;
912     long long sum = 0;
913 
914     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
915         sum += server.inst_metric[metric].samples[j];
916     return sum / STATS_METRIC_SAMPLES;
917 }
918 
919 /* Check for timeouts. Returns non-zero if the client was terminated.
920  * The function gets the current time in milliseconds as argument since
921  * it gets called multiple times in a loop, so calling gettimeofday() for
922  * each iteration would be costly without any actual gain. */
923 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
924     time_t now = now_ms/1000;
925 
926     if (server.maxidletime &&
927         !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
928         !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
929         !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
930         !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
931         (now - c->lastinteraction > server.maxidletime))
932     {
933         serverLog(LL_VERBOSE,"Closing idle client");
934         freeClient(c);
935         return 1;
936     } else if (c->flags & CLIENT_BLOCKED) {
937         /* Blocked OPS timeout is handled with milliseconds resolution.
938          * However note that the actual resolution is limited by
939          * server.hz. */
940 
941         if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
942             /* Handle blocking operation specific timeout. */
943             replyToBlockedClientTimedOut(c);
944             unblockClient(c);
945         } else if (server.cluster_enabled) {
946             /* Cluster: handle unblock & redirect of clients blocked
947              * into keys no longer served by this server. */
948             if (clusterRedirectBlockedClientIfNeeded(c))
949                 unblockClient(c);
950         }
951     }
952     return 0;
953 }
954 
955 /* The client query buffer is an sds.c string that can end with a lot of
956  * free space not used, this function reclaims space if needed.
957  *
958  * The function always returns 0 as it never terminates the client. */
959 int clientsCronResizeQueryBuffer(client *c) {
960     size_t querybuf_size = sdsAllocSize(c->querybuf);
961     time_t idletime = server.unixtime - c->lastinteraction;
962 
963     /* There are two conditions to resize the query buffer:
964      * 1) Query buffer is > BIG_ARG and too big for latest peak.
965      * 2) Client is inactive and the buffer is bigger than 1k. */
966     if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
967          (querybuf_size/(c->querybuf_peak+1)) > 2) ||
968          (querybuf_size > 1024 && idletime > 2))
969     {
970         /* Only resize the query buffer if it is actually wasting space. */
971         if (sdsavail(c->querybuf) > 1024) {
972             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
973         }
974     }
975     /* Reset the peak again to capture the peak memory usage in the next
976      * cycle. */
977     c->querybuf_peak = 0;
978     return 0;
979 }
980 
981 #define CLIENTS_CRON_MIN_ITERATIONS 5
982 void clientsCron(void) {
983     /* Make sure to process at least numclients/server.hz of clients
984      * per call. Since this function is called server.hz times per second
985      * we are sure that in the worst case we process all the clients in 1
986      * second. */
987     int numclients = listLength(server.clients);
988     int iterations = numclients/server.hz;
989     mstime_t now = mstime();
990 
991     /* Process at least a few clients while we are at it, even if we need
992      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
993      * of processing each client once per second. */
994     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
995         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
996                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
997 
998     while(listLength(server.clients) && iterations--) {
999         client *c;
1000         listNode *head;
1001 
1002         /* Rotate the list, take the current head, process.
1003          * This way if the client must be removed from the list it's the
1004          * first element and we don't incur into O(N) computation. */
1005         listRotate(server.clients);
1006         head = listFirst(server.clients);
1007         c = listNodeValue(head);
1008         /* The following functions do different service checks on the client.
1009          * The protocol is that they return non-zero if the client was
1010          * terminated. */
1011         if (clientsCronHandleTimeout(c,now)) continue;
1012         if (clientsCronResizeQueryBuffer(c)) continue;
1013     }
1014 }
1015 
1016 /* This function handles 'background' operations we are required to do
1017  * incrementally in Redis databases, such as active key expiring, resizing,
1018  * rehashing. */
1019 void databasesCron(void) {
1020     /* Expire keys by random sampling. Not required for slaves
1021      * as master will synthesize DELs for us. */
1022     if (server.active_expire_enabled && server.masterhost == NULL)
1023         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1024 
1025     /* Perform hash tables rehashing if needed, but only if there are no
1026      * other processes saving the DB on disk. Otherwise rehashing is bad
1027      * as will cause a lot of copy-on-write of memory pages. */
1028     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1029         /* We use global counters so if we stop the computation at a given
1030          * DB we'll be able to start from the successive in the next
1031          * cron loop iteration. */
1032         static unsigned int resize_db = 0;
1033         static unsigned int rehash_db = 0;
1034         int dbs_per_call = CRON_DBS_PER_CALL;
1035         int j;
1036 
1037         /* Don't test more DBs than we have. */
1038         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1039 
1040         /* Resize */
1041         for (j = 0; j < dbs_per_call; j++) {
1042             tryResizeHashTables(resize_db % server.dbnum);
1043             resize_db++;
1044         }
1045 
1046         /* Rehash */
1047         if (server.activerehashing) {
1048             for (j = 0; j < dbs_per_call; j++) {
1049                 int work_done = incrementallyRehash(rehash_db % server.dbnum);
1050                 rehash_db++;
1051                 if (work_done) {
1052                     /* If the function did some work, stop here, we'll do
1053                      * more at the next cron loop. */
1054                     break;
1055                 }
1056             }
1057         }
1058     }
1059 }
1060 
1061 /* We take a cached value of the unix time in the global state because with
1062  * virtual memory and aging there is to store the current time in objects at
1063  * every object access, and accuracy is not needed. To access a global var is
1064  * a lot faster than calling time(NULL) */
1065 void updateCachedTime(void) {
1066     server.unixtime = time(NULL);
1067     server.mstime = mstime();
1068 }
1069 
1070 /* This is our timer interrupt, called server.hz times per second.
1071  * Here is where we do a number of things that need to be done asynchronously.
1072  * For instance:
1073  *
1074  * - Active expired keys collection (it is also performed in a lazy way on
1075  *   lookup).
1076  * - Software watchdog.
1077  * - Update some statistic.
1078  * - Incremental rehashing of the DBs hash tables.
1079  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1080  * - Clients timeout of different kinds.
1081  * - Replication reconnection.
1082  * - Many more...
1083  *
1084  * Everything directly called here will be called server.hz times per second,
1085  * so in order to throttle execution of things we want to do less frequently
1086  * a macro is used: run_with_period(milliseconds) { .... }
1087  */
1088 
1089 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1090     int j;
1091     UNUSED(eventLoop);
1092     UNUSED(id);
1093     UNUSED(clientData);
1094 
1095     /* Software watchdog: deliver the SIGALRM that will reach the signal
1096      * handler if we don't return here fast enough. */
1097     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1098 
1099     /* Update the time cache. */
1100     updateCachedTime();
1101 
1102     run_with_period(100) {
1103         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1104         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1105                 server.stat_net_input_bytes);
1106         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1107                 server.stat_net_output_bytes);
1108     }
1109 
1110     /* We have just LRU_BITS bits per object for LRU information.
1111      * So we use an (eventually wrapping) LRU clock.
1112      *
1113      * Note that even if the counter wraps it's not a big problem,
1114      * everything will still work but some object will appear younger
1115      * to Redis. However for this to happen a given object should never be
1116      * touched for all the time needed to the counter to wrap, which is
1117      * not likely.
1118      *
1119      * Note that you can change the resolution altering the
1120      * LRU_CLOCK_RESOLUTION define. */
1121     server.lruclock = getLRUClock();
1122 
1123     /* Record the max memory used since the server was started. */
1124     if (zmalloc_used_memory() > server.stat_peak_memory)
1125         server.stat_peak_memory = zmalloc_used_memory();
1126 
1127     /* Sample the RSS here since this is a relatively slow call. */
1128     server.resident_set_size = zmalloc_get_rss();
1129 
1130     /* We received a SIGTERM, shutting down here in a safe way, as it is
1131      * not ok doing so inside the signal handler. */
1132     if (server.shutdown_asap) {
1133         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1134         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1135         server.shutdown_asap = 0;
1136     }
1137 
1138     /* Show some info about non-empty databases */
1139     run_with_period(5000) {
1140         for (j = 0; j < server.dbnum; j++) {
1141             long long size, used, vkeys;
1142 
1143             size = dictSlots(server.db[j].dict);
1144             used = dictSize(server.db[j].dict);
1145             vkeys = dictSize(server.db[j].expires);
1146             if (used || vkeys) {
1147                 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1148                 /* dictPrintStats(server.dict); */
1149             }
1150         }
1151     }
1152 
1153     /* Show information about connected clients */
1154     if (!server.sentinel_mode) {
1155         run_with_period(5000) {
1156             serverLog(LL_VERBOSE,
1157                 "%lu clients connected (%lu slaves), %zu bytes in use",
1158                 listLength(server.clients)-listLength(server.slaves),
1159                 listLength(server.slaves),
1160                 zmalloc_used_memory());
1161         }
1162     }
1163 
1164     /* We need to do a few operations on clients asynchronously. */
1165     clientsCron();
1166 
1167     /* Handle background operations on Redis databases. */
1168     databasesCron();
1169 
1170     /* Start a scheduled AOF rewrite if this was requested by the user while
1171      * a BGSAVE was in progress. */
1172     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1173         server.aof_rewrite_scheduled)
1174     {
1175         rewriteAppendOnlyFileBackground();
1176     }
1177 
1178     /* Check if a background saving or AOF rewrite in progress terminated. */
1179     if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1180         ldbPendingChildren())
1181     {
1182         int statloc;
1183         pid_t pid;
1184 
1185         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1186             int exitcode = WEXITSTATUS(statloc);
1187             int bysignal = 0;
1188 
1189             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1190 
1191             if (pid == -1) {
1192                 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1193                     "rdb_child_pid = %d, aof_child_pid = %d",
1194                     strerror(errno),
1195                     (int) server.rdb_child_pid,
1196                     (int) server.aof_child_pid);
1197             } else if (pid == server.rdb_child_pid) {
1198                 backgroundSaveDoneHandler(exitcode,bysignal);
1199             } else if (pid == server.aof_child_pid) {
1200                 backgroundRewriteDoneHandler(exitcode,bysignal);
1201             } else {
1202                 if (!ldbRemoveChild(pid)) {
1203                     serverLog(LL_WARNING,
1204                         "Warning, detected child with unmatched pid: %ld",
1205                         (long)pid);
1206                 }
1207             }
1208             updateDictResizePolicy();
1209         }
1210     } else {
1211         /* If there is not a background saving/rewrite in progress check if
1212          * we have to save/rewrite now */
1213          for (j = 0; j < server.saveparamslen; j++) {
1214             struct saveparam *sp = server.saveparams+j;
1215 
1216             /* Save if we reached the given amount of changes,
1217              * the given amount of seconds, and if the latest bgsave was
1218              * successful or if, in case of an error, at least
1219              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1220             if (server.dirty >= sp->changes &&
1221                 server.unixtime-server.lastsave > sp->seconds &&
1222                 (server.unixtime-server.lastbgsave_try >
1223                  CONFIG_BGSAVE_RETRY_DELAY ||
1224                  server.lastbgsave_status == C_OK))
1225             {
1226                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1227                     sp->changes, (int)sp->seconds);
1228                 rdbSaveBackground(server.rdb_filename);
1229                 break;
1230             }
1231          }
1232 
1233          /* Trigger an AOF rewrite if needed */
1234          if (server.rdb_child_pid == -1 &&
1235              server.aof_child_pid == -1 &&
1236              server.aof_rewrite_perc &&
1237              server.aof_current_size > server.aof_rewrite_min_size)
1238          {
1239             long long base = server.aof_rewrite_base_size ?
1240                             server.aof_rewrite_base_size : 1;
1241             long long growth = (server.aof_current_size*100/base) - 100;
1242             if (growth >= server.aof_rewrite_perc) {
1243                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1244                 rewriteAppendOnlyFileBackground();
1245             }
1246          }
1247     }
1248 
1249 
1250     /* AOF postponed flush: Try at every cron cycle if the slow fsync
1251      * completed. */
1252     if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1253 
1254     /* AOF write errors: in this case we have a buffer to flush as well and
1255      * clear the AOF error in case of success to make the DB writable again,
1256      * however to try every second is enough in case of 'hz' is set to
1257      * an higher frequency. */
1258     run_with_period(1000) {
1259         if (server.aof_last_write_status == C_ERR)
1260             flushAppendOnlyFile(0);
1261     }
1262 
1263     /* Close clients that need to be closed asynchronous */
1264     freeClientsInAsyncFreeQueue();
1265 
1266     /* Clear the paused clients flag if needed. */
1267     clientsArePaused(); /* Don't check return value, just use the side effect. */
1268 
1269     /* Replication cron function -- used to reconnect to master and
1270      * to detect transfer failures. */
1271     run_with_period(1000) replicationCron();
1272 
1273     /* Run the Redis Cluster cron. */
1274     run_with_period(100) {
1275         if (server.cluster_enabled) clusterCron();
1276     }
1277 
1278     /* Run the Sentinel timer if we are in sentinel mode. */
1279     run_with_period(100) {
1280         if (server.sentinel_mode) sentinelTimer();
1281     }
1282 
1283     /* Cleanup expired MIGRATE cached sockets. */
1284     run_with_period(1000) {
1285         migrateCloseTimedoutSockets();
1286     }
1287 
1288     server.cronloops++;
1289     return 1000/server.hz;
1290 }
1291 
1292 /* This function gets called every time Redis is entering the
1293  * main loop of the event driven library, that is, before to sleep
1294  * for ready file descriptors. */
1295 void beforeSleep(struct aeEventLoop *eventLoop) {
1296     UNUSED(eventLoop);
1297 
1298     /* Call the Redis Cluster before sleep function. Note that this function
1299      * may change the state of Redis Cluster (from ok to fail or vice versa),
1300      * so it's a good idea to call it before serving the unblocked clients
1301      * later in this function. */
1302     if (server.cluster_enabled) clusterBeforeSleep();
1303 
1304     /* Run a fast expire cycle (the called function will return
1305      * ASAP if a fast cycle is not needed). */
1306     if (server.active_expire_enabled && server.masterhost == NULL)
1307         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1308 
1309     /* Send all the slaves an ACK request if at least one client blocked
1310      * during the previous event loop iteration. */
1311     if (server.get_ack_from_slaves) {
1312         robj *argv[3];
1313 
1314         argv[0] = createStringObject("REPLCONF",8);
1315         argv[1] = createStringObject("GETACK",6);
1316         argv[2] = createStringObject("*",1); /* Not used argument. */
1317         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1318         decrRefCount(argv[0]);
1319         decrRefCount(argv[1]);
1320         decrRefCount(argv[2]);
1321         server.get_ack_from_slaves = 0;
1322     }
1323 
1324     /* Unblock all the clients blocked for synchronous replication
1325      * in WAIT. */
1326     if (listLength(server.clients_waiting_acks))
1327         processClientsWaitingReplicas();
1328 
1329     /* Try to process pending commands for clients that were just unblocked. */
1330     if (listLength(server.unblocked_clients))
1331         processUnblockedClients();
1332 
1333     /* Write the AOF buffer on disk */
1334     flushAppendOnlyFile(0);
1335 
1336     /* Handle writes with pending output buffers. */
1337     handleClientsWithPendingWrites();
1338 }
1339 
1340 /* =========================== Server initialization ======================== */
1341 
1342 void createSharedObjects(void) {
1343     int j;
1344 
1345     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1346     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1347     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1348     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1349     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1350     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1351     shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1352     shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1353     shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1354     shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1355     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1356     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1357     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1358     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1359         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1360     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1361         "-ERR no such key\r\n"));
1362     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1363         "-ERR syntax error\r\n"));
1364     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1365         "-ERR source and destination objects are the same\r\n"));
1366     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1367         "-ERR index out of range\r\n"));
1368     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1369         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1370     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1371         "-LOADING Redis is loading the dataset in memory\r\n"));
1372     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1373         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1374     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1375         "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
1376     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1377         "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
1378     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1379         "-READONLY You can't write against a read only slave.\r\n"));
1380     shared.noautherr = createObject(OBJ_STRING,sdsnew(
1381         "-NOAUTH Authentication required.\r\n"));
1382     shared.oomerr = createObject(OBJ_STRING,sdsnew(
1383         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1384     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1385         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1386     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1387         "-NOREPLICAS Not enough good slaves to write.\r\n"));
1388     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1389         "-BUSYKEY Target key name already exists.\r\n"));
1390     shared.space = createObject(OBJ_STRING,sdsnew(" "));
1391     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1392     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1393 
1394     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1395         char dictid_str[64];
1396         int dictid_len;
1397 
1398         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1399         shared.select[j] = createObject(OBJ_STRING,
1400             sdscatprintf(sdsempty(),
1401                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1402                 dictid_len, dictid_str));
1403     }
1404     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1405     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1406     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1407     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1408     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1409     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1410     shared.del = createStringObject("DEL",3);
1411     shared.rpop = createStringObject("RPOP",4);
1412     shared.lpop = createStringObject("LPOP",4);
1413     shared.lpush = createStringObject("LPUSH",5);
1414     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1415         shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j);
1416         shared.integers[j]->encoding = OBJ_ENCODING_INT;
1417     }
1418     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1419         shared.mbulkhdr[j] = createObject(OBJ_STRING,
1420             sdscatprintf(sdsempty(),"*%d\r\n",j));
1421         shared.bulkhdr[j] = createObject(OBJ_STRING,
1422             sdscatprintf(sdsempty(),"$%d\r\n",j));
1423     }
1424     /* The following two shared objects, minstring and maxstrings, are not
1425      * actually used for their value but as a special object meaning
1426      * respectively the minimum possible string and the maximum possible
1427      * string in string comparisons for the ZRANGEBYLEX command. */
1428     shared.minstring = createStringObject("minstring",9);
1429     shared.maxstring = createStringObject("maxstring",9);
1430 }
1431 
1432 void initServerConfig(void) {
1433     int j;
1434 
1435     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1436     server.configfile = NULL;
1437     server.executable = NULL;
1438     server.hz = CONFIG_DEFAULT_HZ;
1439     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1440     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1441     server.port = CONFIG_DEFAULT_SERVER_PORT;
1442     server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1443     server.bindaddr_count = 0;
1444     server.unixsocket = NULL;
1445     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1446     server.ipfd_count = 0;
1447     server.sofd = -1;
1448     server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
1449     server.dbnum = CONFIG_DEFAULT_DBNUM;
1450     server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1451     server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1452     server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1453     server.active_expire_enabled = 1;
1454     server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1455     server.saveparams = NULL;
1456     server.loading = 0;
1457     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1458     server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1459     server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1460     server.syslog_facility = LOG_LOCAL0;
1461     server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1462     server.supervised = 0;
1463     server.supervised_mode = SUPERVISED_NONE;
1464     server.aof_state = AOF_OFF;
1465     server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1466     server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1467     server.aof_rewrite_perc = AOF_REWRITE_PERC;
1468     server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1469     server.aof_rewrite_base_size = 0;
1470     server.aof_rewrite_scheduled = 0;
1471     server.aof_last_fsync = time(NULL);
1472     server.aof_rewrite_time_last = -1;
1473     server.aof_rewrite_time_start = -1;
1474     server.aof_lastbgrewrite_status = C_OK;
1475     server.aof_delayed_fsync = 0;
1476     server.aof_fd = -1;
1477     server.aof_selected_db = -1; /* Make sure the first time will not match */
1478     server.aof_flush_postponed_start = 0;
1479     server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1480     server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1481     server.pidfile = NULL;
1482     server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1483     server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1484     server.requirepass = NULL;
1485     server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1486     server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1487     server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1488     server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1489     server.notify_keyspace_events = 0;
1490     server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1491     server.bpop_blocked_clients = 0;
1492     server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1493     server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1494     server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1495     server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1496     server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1497     server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1498     server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1499     server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1500     server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1501     server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1502     server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1503     server.shutdown_asap = 0;
1504     server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1505     server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1506     server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1507     server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1508     server.cluster_enabled = 0;
1509     server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1510     server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1511     server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1512     server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1513     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1514     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1515     server.next_client_id = 1; /* Client IDs, start from 1 .*/
1516     server.loading_process_events_interval_bytes = (1024*1024*2);
1517 
1518     server.lruclock = getLRUClock();
1519     resetServerSaveParams();
1520 
1521     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
1522     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
1523     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1524     /* Replication related */
1525     server.masterauth = NULL;
1526     server.masterhost = NULL;
1527     server.masterport = 6379;
1528     server.master = NULL;
1529     server.cached_master = NULL;
1530     server.repl_master_initial_offset = -1;
1531     server.repl_state = REPL_STATE_NONE;
1532     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1533     server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1534     server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1535     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1536     server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1537     server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1538     server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1539     server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1540     server.master_repl_offset = 0;
1541 
1542     /* Replication partial resync backlog */
1543     server.repl_backlog = NULL;
1544     server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1545     server.repl_backlog_histlen = 0;
1546     server.repl_backlog_idx = 0;
1547     server.repl_backlog_off = 0;
1548     server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1549     server.repl_no_slaves_since = time(NULL);
1550 
1551     /* Client output buffer limits */
1552     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1553         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1554 
1555     /* Double constants initialization */
1556     R_Zero = 0.0;
1557     R_PosInf = 1.0/R_Zero;
1558     R_NegInf = -1.0/R_Zero;
1559     R_Nan = R_Zero/R_Zero;
1560 
1561     /* Command table -- we initiialize it here as it is part of the
1562      * initial configuration, since command names may be changed via
1563      * redis.conf using the rename-command directive. */
1564     server.commands = dictCreate(&commandTableDictType,NULL);
1565     server.orig_commands = dictCreate(&commandTableDictType,NULL);
1566     populateCommandTable();
1567     server.delCommand = lookupCommandByCString("del");
1568     server.multiCommand = lookupCommandByCString("multi");
1569     server.lpushCommand = lookupCommandByCString("lpush");
1570     server.lpopCommand = lookupCommandByCString("lpop");
1571     server.rpopCommand = lookupCommandByCString("rpop");
1572     server.sremCommand = lookupCommandByCString("srem");
1573     server.execCommand = lookupCommandByCString("exec");
1574 
1575     /* Slow log */
1576     server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1577     server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1578 
1579     /* Latency monitor */
1580     server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1581 
1582     /* Debugging */
1583     server.assert_failed = "<no assertion failed>";
1584     server.assert_file = "<no file>";
1585     server.assert_line = 0;
1586     server.bug_report_start = 0;
1587     server.watchdog_period = 0;
1588 }
1589 
1590 extern char **environ;
1591 
1592 /* Restart the server, executing the same executable that started this
1593  * instance, with the same arguments and configuration file.
1594  *
1595  * The function is designed to directly call execve() so that the new
1596  * server instance will retain the PID of the previous one.
1597  *
1598  * The list of flags, that may be bitwise ORed together, alter the
1599  * behavior of this function:
1600  *
1601  * RESTART_SERVER_NONE              No flags.
1602  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
1603  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
1604  *
1605  * On success the function does not return, because the process turns into
1606  * a different process. On error C_ERR is returned. */
1607 int restartServer(int flags, mstime_t delay) {
1608     int j;
1609 
1610     /* Check if we still have accesses to the executable that started this
1611      * server instance. */
1612     if (access(server.executable,X_OK) == -1) return C_ERR;
1613 
1614     /* Config rewriting. */
1615     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1616         server.configfile &&
1617         rewriteConfig(server.configfile) == -1) return C_ERR;
1618 
1619     /* Perform a proper shutdown. */
1620     if (flags & RESTART_SERVER_GRACEFULLY &&
1621         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR;
1622 
1623     /* Close all file descriptors, with the exception of stdin, stdout, strerr
1624      * which are useful if we restart a Redis server which is not daemonized. */
1625     for (j = 3; j < (int)server.maxclients + 1024; j++) close(j);
1626 
1627     /* Execute the server with the original command line. */
1628     if (delay) usleep(delay*1000);
1629     execve(server.executable,server.exec_argv,environ);
1630 
1631     /* If an error occurred here, there is nothing we can do, but exit. */
1632     _exit(1);
1633 
1634     return C_ERR; /* Never reached. */
1635 }
1636 
1637 /* This function will try to raise the max number of open files accordingly to
1638  * the configured max number of clients. It also reserves a number of file
1639  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1640  * persistence, listening sockets, log files and so forth.
1641  *
1642  * If it will not be possible to set the limit accordingly to the configured
1643  * max number of clients, the function will do the reverse setting
1644  * server.maxclients to the value that we can actually handle. */
1645 void adjustOpenFilesLimit(void) {
1646     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1647     struct rlimit limit;
1648 
1649     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1650         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1651             strerror(errno));
1652         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1653     } else {
1654         rlim_t oldlimit = limit.rlim_cur;
1655 
1656         /* Set the max number of files if the current limit is not enough
1657          * for our needs. */
1658         if (oldlimit < maxfiles) {
1659             rlim_t bestlimit;
1660             int setrlimit_error = 0;
1661 
1662             /* Try to set the file limit to match 'maxfiles' or at least
1663              * to the higher value supported less than maxfiles. */
1664             bestlimit = maxfiles;
1665             while(bestlimit > oldlimit) {
1666                 rlim_t decr_step = 16;
1667 
1668                 limit.rlim_cur = bestlimit;
1669                 limit.rlim_max = bestlimit;
1670                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1671                 setrlimit_error = errno;
1672 
1673                 /* We failed to set file limit to 'bestlimit'. Try with a
1674                  * smaller limit decrementing by a few FDs per iteration. */
1675                 if (bestlimit < decr_step) break;
1676                 bestlimit -= decr_step;
1677             }
1678 
1679             /* Assume that the limit we get initially is still valid if
1680              * our last try was even lower. */
1681             if (bestlimit < oldlimit) bestlimit = oldlimit;
1682 
1683             if (bestlimit < maxfiles) {
1684                 int old_maxclients = server.maxclients;
1685                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1686                 if (server.maxclients < 1) {
1687                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
1688                         "of %llu is not enough for the server to start. "
1689                         "Please increase your open file limit to at least "
1690                         "%llu. Exiting.",
1691                         (unsigned long long) oldlimit,
1692                         (unsigned long long) maxfiles);
1693                     exit(1);
1694                 }
1695                 serverLog(LL_WARNING,"You requested maxclients of %d "
1696                     "requiring at least %llu max file descriptors.",
1697                     old_maxclients,
1698                     (unsigned long long) maxfiles);
1699                 serverLog(LL_WARNING,"Server can't set maximum open files "
1700                     "to %llu because of OS error: %s.",
1701                     (unsigned long long) maxfiles, strerror(setrlimit_error));
1702                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1703                     "maxclients has been reduced to %d to compensate for "
1704                     "low ulimit. "
1705                     "If you need higher maxclients increase 'ulimit -n'.",
1706                     (unsigned long long) bestlimit, server.maxclients);
1707             } else {
1708                 serverLog(LL_NOTICE,"Increased maximum number of open files "
1709                     "to %llu (it was originally set to %llu).",
1710                     (unsigned long long) maxfiles,
1711                     (unsigned long long) oldlimit);
1712             }
1713         }
1714     }
1715 }
1716 
1717 /* Check that server.tcp_backlog can be actually enforced in Linux according
1718  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
1719 void checkTcpBacklogSettings(void) {
1720 #ifdef HAVE_PROC_SOMAXCONN
1721     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1722     char buf[1024];
1723     if (!fp) return;
1724     if (fgets(buf,sizeof(buf),fp) != NULL) {
1725         int somaxconn = atoi(buf);
1726         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1727             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1728         }
1729     }
1730     fclose(fp);
1731 #endif
1732 }
1733 
1734 /* Initialize a set of file descriptors to listen to the specified 'port'
1735  * binding the addresses specified in the Redis server configuration.
1736  *
1737  * The listening file descriptors are stored in the integer array 'fds'
1738  * and their number is set in '*count'.
1739  *
1740  * The addresses to bind are specified in the global server.bindaddr array
1741  * and their number is server.bindaddr_count. If the server configuration
1742  * contains no specific addresses to bind, this function will try to
1743  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1744  *
1745  * On success the function returns C_OK.
1746  *
1747  * On error the function returns C_ERR. For the function to be on
1748  * error, at least one of the server.bindaddr addresses was
1749  * impossible to bind, or no bind addresses were specified in the server
1750  * configuration but the function is not able to bind * for at least
1751  * one of the IPv4 or IPv6 protocols. */
1752 int listenToPort(int port, int *fds, int *count) {
1753     int j;
1754 
1755     /* Force binding of 0.0.0.0 if no bind address is specified, always
1756      * entering the loop if j == 0. */
1757     if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1758     for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1759         if (server.bindaddr[j] == NULL) {
1760             /* Bind * for both IPv6 and IPv4, we enter here only if
1761              * server.bindaddr_count == 0. */
1762             fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1763                 server.tcp_backlog);
1764             if (fds[*count] != ANET_ERR) {
1765                 anetNonBlock(NULL,fds[*count]);
1766                 (*count)++;
1767 
1768                 /* Bind the IPv4 address as well. */
1769                 fds[*count] = anetTcpServer(server.neterr,port,NULL,
1770                     server.tcp_backlog);
1771                 if (fds[*count] != ANET_ERR) {
1772                     anetNonBlock(NULL,fds[*count]);
1773                     (*count)++;
1774                 }
1775             }
1776             /* Exit the loop if we were able to bind * on IPv4 and IPv6,
1777              * otherwise fds[*count] will be ANET_ERR and we'll print an
1778              * error and return to the caller with an error. */
1779             if (*count == 2) break;
1780         } else if (strchr(server.bindaddr[j],':')) {
1781             /* Bind IPv6 address. */
1782             fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1783                 server.tcp_backlog);
1784         } else {
1785             /* Bind IPv4 address. */
1786             fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1787                 server.tcp_backlog);
1788         }
1789         if (fds[*count] == ANET_ERR) {
1790             serverLog(LL_WARNING,
1791                 "Creating Server TCP listening socket %s:%d: %s",
1792                 server.bindaddr[j] ? server.bindaddr[j] : "*",
1793                 port, server.neterr);
1794             return C_ERR;
1795         }
1796         anetNonBlock(NULL,fds[*count]);
1797         (*count)++;
1798     }
1799     return C_OK;
1800 }
1801 
1802 /* Resets the stats that we expose via INFO or other means that we want
1803  * to reset via CONFIG RESETSTAT. The function is also used in order to
1804  * initialize these fields in initServer() at server startup. */
1805 void resetServerStats(void) {
1806     int j;
1807 
1808     server.stat_numcommands = 0;
1809     server.stat_numconnections = 0;
1810     server.stat_expiredkeys = 0;
1811     server.stat_evictedkeys = 0;
1812     server.stat_keyspace_misses = 0;
1813     server.stat_keyspace_hits = 0;
1814     server.stat_fork_time = 0;
1815     server.stat_fork_rate = 0;
1816     server.stat_rejected_conn = 0;
1817     server.stat_sync_full = 0;
1818     server.stat_sync_partial_ok = 0;
1819     server.stat_sync_partial_err = 0;
1820     for (j = 0; j < STATS_METRIC_COUNT; j++) {
1821         server.inst_metric[j].idx = 0;
1822         server.inst_metric[j].last_sample_time = mstime();
1823         server.inst_metric[j].last_sample_count = 0;
1824         memset(server.inst_metric[j].samples,0,
1825             sizeof(server.inst_metric[j].samples));
1826     }
1827     server.stat_net_input_bytes = 0;
1828     server.stat_net_output_bytes = 0;
1829     server.aof_delayed_fsync = 0;
1830 }
1831 
1832 void initServer(void) {
1833     int j;
1834 
1835     signal(SIGHUP, SIG_IGN);
1836     signal(SIGPIPE, SIG_IGN);
1837     setupSignalHandlers();
1838 
1839     if (server.syslog_enabled) {
1840         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
1841             server.syslog_facility);
1842     }
1843 
1844     server.pid = getpid();
1845     server.current_client = NULL;
1846     server.clients = listCreate();
1847     server.clients_to_close = listCreate();
1848     server.slaves = listCreate();
1849     server.monitors = listCreate();
1850     server.clients_pending_write = listCreate();
1851     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
1852     server.unblocked_clients = listCreate();
1853     server.ready_keys = listCreate();
1854     server.clients_waiting_acks = listCreate();
1855     server.get_ack_from_slaves = 0;
1856     server.clients_paused = 0;
1857     server.system_memory_size = zmalloc_get_memory_size();
1858 
1859     createSharedObjects();
1860     adjustOpenFilesLimit();
1861     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
1862     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1863 
1864     /* Open the TCP listening socket for the user commands. */
1865     if (server.port != 0 &&
1866         listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
1867         exit(1);
1868 
1869     /* Open the listening Unix domain socket. */
1870     if (server.unixsocket != NULL) {
1871         unlink(server.unixsocket); /* don't care if this fails */
1872         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
1873             server.unixsocketperm, server.tcp_backlog);
1874         if (server.sofd == ANET_ERR) {
1875             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
1876             exit(1);
1877         }
1878         anetNonBlock(NULL,server.sofd);
1879     }
1880 
1881     /* Abort if there are no listening sockets at all. */
1882     if (server.ipfd_count == 0 && server.sofd < 0) {
1883         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
1884         exit(1);
1885     }
1886 
1887     /* Create the Redis databases, and initialize other internal state. */
1888     for (j = 0; j < server.dbnum; j++) {
1889         server.db[j].dict = dictCreate(&dbDictType,NULL);
1890         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
1891         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
1892         server.db[j].ready_keys = dictCreate(&setDictType,NULL);
1893         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
1894         server.db[j].eviction_pool = evictionPoolAlloc();
1895         server.db[j].id = j;
1896         server.db[j].avg_ttl = 0;
1897     }
1898     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
1899     server.pubsub_patterns = listCreate();
1900     listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
1901     listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
1902     server.cronloops = 0;
1903     server.rdb_child_pid = -1;
1904     server.aof_child_pid = -1;
1905     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1906     aofRewriteBufferReset();
1907     server.aof_buf = sdsempty();
1908     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
1909     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
1910     server.rdb_save_time_last = -1;
1911     server.rdb_save_time_start = -1;
1912     server.dirty = 0;
1913     resetServerStats();
1914     /* A few stats we don't want to reset: server startup time, and peak mem. */
1915     server.stat_starttime = time(NULL);
1916     server.stat_peak_memory = 0;
1917     server.resident_set_size = 0;
1918     server.lastbgsave_status = C_OK;
1919     server.aof_last_write_status = C_OK;
1920     server.aof_last_write_errno = 0;
1921     server.repl_good_slaves_count = 0;
1922     updateCachedTime();
1923 
1924     /* Create the serverCron() time event, that's our main way to process
1925      * background operations. */
1926     if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
1927         serverPanic("Can't create the serverCron time event.");
1928         exit(1);
1929     }
1930 
1931     /* Create an event handler for accepting new connections in TCP and Unix
1932      * domain sockets. */
1933     for (j = 0; j < server.ipfd_count; j++) {
1934         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
1935             acceptTcpHandler,NULL) == AE_ERR)
1936             {
1937                 serverPanic(
1938                     "Unrecoverable error creating server.ipfd file event.");
1939             }
1940     }
1941     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
1942         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
1943 
1944     /* Open the AOF file if needed. */
1945     if (server.aof_state == AOF_ON) {
1946         server.aof_fd = open(server.aof_filename,
1947                                O_WRONLY|O_APPEND|O_CREAT,0644);
1948         if (server.aof_fd == -1) {
1949             serverLog(LL_WARNING, "Can't open the append-only file: %s",
1950                 strerror(errno));
1951             exit(1);
1952         }
1953     }
1954 
1955     /* 32 bit instances are limited to 4GB of address space, so if there is
1956      * no explicit limit in the user provided configuration we set a limit
1957      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
1958      * useless crashes of the Redis instance for out of memory. */
1959     if (server.arch_bits == 32 && server.maxmemory == 0) {
1960         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
1961         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
1962         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
1963     }
1964 
1965     if (server.cluster_enabled) clusterInit();
1966     replicationScriptCacheInit();
1967     scriptingInit(1);
1968     slowlogInit();
1969     latencyMonitorInit();
1970     bioInit();
1971 }
1972 
1973 /* Populates the Redis Command Table starting from the hard coded list
1974  * we have on top of redis.c file. */
1975 void populateCommandTable(void) {
1976     int j;
1977     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
1978 
1979     for (j = 0; j < numcommands; j++) {
1980         struct redisCommand *c = redisCommandTable+j;
1981         char *f = c->sflags;
1982         int retval1, retval2;
1983 
1984         while(*f != '\0') {
1985             switch(*f) {
1986             case 'w': c->flags |= CMD_WRITE; break;
1987             case 'r': c->flags |= CMD_READONLY; break;
1988             case 'm': c->flags |= CMD_DENYOOM; break;
1989             case 'a': c->flags |= CMD_ADMIN; break;
1990             case 'p': c->flags |= CMD_PUBSUB; break;
1991             case 's': c->flags |= CMD_NOSCRIPT; break;
1992             case 'R': c->flags |= CMD_RANDOM; break;
1993             case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
1994             case 'l': c->flags |= CMD_LOADING; break;
1995             case 't': c->flags |= CMD_STALE; break;
1996             case 'M': c->flags |= CMD_SKIP_MONITOR; break;
1997             case 'k': c->flags |= CMD_ASKING; break;
1998             case 'F': c->flags |= CMD_FAST; break;
1999             default: serverPanic("Unsupported command flag"); break;
2000             }
2001             f++;
2002         }
2003 
2004         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
2005         /* Populate an additional dictionary that will be unaffected
2006          * by rename-command statements in redis.conf. */
2007         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2008         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2009     }
2010 }
2011 
2012 void resetCommandTableStats(void) {
2013     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2014     int j;
2015 
2016     for (j = 0; j < numcommands; j++) {
2017         struct redisCommand *c = redisCommandTable+j;
2018 
2019         c->microseconds = 0;
2020         c->calls = 0;
2021     }
2022 }
2023 
2024 /* ========================== Redis OP Array API ============================ */
2025 
2026 void redisOpArrayInit(redisOpArray *oa) {
2027     oa->ops = NULL;
2028     oa->numops = 0;
2029 }
2030 
2031 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2032                        robj **argv, int argc, int target)
2033 {
2034     redisOp *op;
2035 
2036     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2037     op = oa->ops+oa->numops;
2038     op->cmd = cmd;
2039     op->dbid = dbid;
2040     op->argv = argv;
2041     op->argc = argc;
2042     op->target = target;
2043     oa->numops++;
2044     return oa->numops;
2045 }
2046 
2047 void redisOpArrayFree(redisOpArray *oa) {
2048     while(oa->numops) {
2049         int j;
2050         redisOp *op;
2051 
2052         oa->numops--;
2053         op = oa->ops+oa->numops;
2054         for (j = 0; j < op->argc; j++)
2055             decrRefCount(op->argv[j]);
2056         zfree(op->argv);
2057     }
2058     zfree(oa->ops);
2059 }
2060 
2061 /* ====================== Commands lookup and execution ===================== */
2062 
2063 struct redisCommand *lookupCommand(sds name) {
2064     return dictFetchValue(server.commands, name);
2065 }
2066 
2067 struct redisCommand *lookupCommandByCString(char *s) {
2068     struct redisCommand *cmd;
2069     sds name = sdsnew(s);
2070 
2071     cmd = dictFetchValue(server.commands, name);
2072     sdsfree(name);
2073     return cmd;
2074 }
2075 
2076 /* Lookup the command in the current table, if not found also check in
2077  * the original table containing the original command names unaffected by
2078  * redis.conf rename-command statement.
2079  *
2080  * This is used by functions rewriting the argument vector such as
2081  * rewriteClientCommandVector() in order to set client->cmd pointer
2082  * correctly even if the command was renamed. */
2083 struct redisCommand *lookupCommandOrOriginal(sds name) {
2084     struct redisCommand *cmd = dictFetchValue(server.commands, name);
2085 
2086     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2087     return cmd;
2088 }
2089 
2090 /* Propagate the specified command (in the context of the specified database id)
2091  * to AOF and Slaves.
2092  *
2093  * flags are an xor between:
2094  * + PROPAGATE_NONE (no propagation of command at all)
2095  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2096  * + PROPAGATE_REPL (propagate into the replication link)
2097  *
2098  * This should not be used inside commands implementation. Use instead
2099  * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2100  */
2101 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2102                int flags)
2103 {
2104     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2105         feedAppendOnlyFile(cmd,dbid,argv,argc);
2106     if (flags & PROPAGATE_REPL)
2107         replicationFeedSlaves(server.slaves,dbid,argv,argc);
2108 }
2109 
2110 /* Used inside commands to schedule the propagation of additional commands
2111  * after the current command is propagated to AOF / Replication.
2112  *
2113  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2114  * database ID the command should be propagated into.
2115  * Arguments of the command to propagte are passed as an array of redis
2116  * objects pointers of len 'argc', using the 'argv' vector.
2117  *
2118  * The function does not take a reference to the passed 'argv' vector,
2119  * so it is up to the caller to release the passed argv (but it is usually
2120  * stack allocated).  The function autoamtically increments ref count of
2121  * passed objects, so the caller does not need to. */
2122 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2123                    int target)
2124 {
2125     robj **argvcopy;
2126     int j;
2127 
2128     if (server.loading) return; /* No propagation during loading. */
2129 
2130     argvcopy = zmalloc(sizeof(robj*)*argc);
2131     for (j = 0; j < argc; j++) {
2132         argvcopy[j] = argv[j];
2133         incrRefCount(argv[j]);
2134     }
2135     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2136 }
2137 
2138 /* It is possible to call the function forceCommandPropagation() inside a
2139  * Redis command implementation in order to to force the propagation of a
2140  * specific command execution into AOF / Replication. */
2141 void forceCommandPropagation(client *c, int flags) {
2142     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2143     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2144 }
2145 
2146 /* Avoid that the executed command is propagated at all. This way we
2147  * are free to just propagate what we want using the alsoPropagate()
2148  * API. */
2149 void preventCommandPropagation(client *c) {
2150     c->flags |= CLIENT_PREVENT_PROP;
2151 }
2152 
2153 /* AOF specific version of preventCommandPropagation(). */
2154 void preventCommandAOF(client *c) {
2155     c->flags |= CLIENT_PREVENT_AOF_PROP;
2156 }
2157 
2158 /* Replication specific version of preventCommandPropagation(). */
2159 void preventCommandReplication(client *c) {
2160     c->flags |= CLIENT_PREVENT_REPL_PROP;
2161 }
2162 
2163 /* Call() is the core of Redis execution of a command.
2164  *
2165  * The following flags can be passed:
2166  * CMD_CALL_NONE        No flags.
2167  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
2168  * CMD_CALL_STATS       Populate command stats.
2169  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
2170  *                          or if the client flags are forcing propagation.
2171  * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
2172  *                          or if the client flags are forcing propagation.
2173  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2174  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
2175  *
2176  * The exact propagation behavior depends on the client flags.
2177  * Specifically:
2178  *
2179  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2180  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2181  *    in the call flags, then the command is propagated even if the
2182  *    dataset was not affected by the command.
2183  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2184  *    are set, the propagation into AOF or to slaves is not performed even
2185  *    if the command modified the dataset.
2186  *
2187  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2188  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2189  * slaves propagation will never occur.
2190  *
2191  * Client flags are modified by the implementation of a given command
2192  * using the following API:
2193  *
2194  * forceCommandPropagation(client *c, int flags);
2195  * preventCommandPropagation(client *c);
2196  * preventCommandAOF(client *c);
2197  * preventCommandReplication(client *c);
2198  *
2199  */
2200 void call(client *c, int flags) {
2201     long long dirty, start, duration;
2202     int client_old_flags = c->flags;
2203 
2204     /* Sent the command to clients in MONITOR mode, only if the commands are
2205      * not generated from reading an AOF. */
2206     if (listLength(server.monitors) &&
2207         !server.loading &&
2208         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2209     {
2210         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2211     }
2212 
2213     /* Initialization: clear the flags that must be set by the command on
2214      * demand, and initialize the array for additional commands propagation. */
2215     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2216     redisOpArrayInit(&server.also_propagate);
2217 
2218     /* Call the command. */
2219     dirty = server.dirty;
2220     start = ustime();
2221     c->cmd->proc(c);
2222     duration = ustime()-start;
2223     dirty = server.dirty-dirty;
2224     if (dirty < 0) dirty = 0;
2225 
2226     /* When EVAL is called loading the AOF we don't want commands called
2227      * from Lua to go into the slowlog or to populate statistics. */
2228     if (server.loading && c->flags & CLIENT_LUA)
2229         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2230 
2231     /* If the caller is Lua, we want to force the EVAL caller to propagate
2232      * the script if the command flag or client flag are forcing the
2233      * propagation. */
2234     if (c->flags & CLIENT_LUA && server.lua_caller) {
2235         if (c->flags & CLIENT_FORCE_REPL)
2236             server.lua_caller->flags |= CLIENT_FORCE_REPL;
2237         if (c->flags & CLIENT_FORCE_AOF)
2238             server.lua_caller->flags |= CLIENT_FORCE_AOF;
2239     }
2240 
2241     /* Log the command into the Slow log if needed, and populate the
2242      * per-command statistics that we show in INFO commandstats. */
2243     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2244         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2245                               "fast-command" : "command";
2246         latencyAddSampleIfNeeded(latency_event,duration/1000);
2247         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
2248     }
2249     if (flags & CMD_CALL_STATS) {
2250         c->lastcmd->microseconds += duration;
2251         c->lastcmd->calls++;
2252     }
2253 
2254     /* Propagate the command into the AOF and replication link */
2255     if (flags & CMD_CALL_PROPAGATE &&
2256         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2257     {
2258         int propagate_flags = PROPAGATE_NONE;
2259 
2260         /* Check if the command operated changes in the data set. If so
2261          * set for replication / AOF propagation. */
2262         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2263 
2264         /* If the client forced AOF / replication of the command, set
2265          * the flags regardless of the command effects on the data set. */
2266         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2267         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2268 
2269         /* However prevent AOF / replication propagation if the command
2270          * implementatino called preventCommandPropagation() or similar,
2271          * or if we don't have the call() flags to do so. */
2272         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2273             !(flags & CMD_CALL_PROPAGATE_REPL))
2274                 propagate_flags &= ~PROPAGATE_REPL;
2275         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2276             !(flags & CMD_CALL_PROPAGATE_AOF))
2277                 propagate_flags &= ~PROPAGATE_AOF;
2278 
2279         /* Call propagate() only if at least one of AOF / replication
2280          * propagation is needed. */
2281         if (propagate_flags != PROPAGATE_NONE)
2282             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2283     }
2284 
2285     /* Restore the old replication flags, since call() can be executed
2286      * recursively. */
2287     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2288     c->flags |= client_old_flags &
2289         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2290 
2291     /* Handle the alsoPropagate() API to handle commands that want to propagate
2292      * multiple separated commands. Note that alsoPropagate() is not affected
2293      * by CLIENT_PREVENT_PROP flag. */
2294     if (server.also_propagate.numops) {
2295         int j;
2296         redisOp *rop;
2297 
2298         if (flags & CMD_CALL_PROPAGATE) {
2299             for (j = 0; j < server.also_propagate.numops; j++) {
2300                 rop = &server.also_propagate.ops[j];
2301                 int target = rop->target;
2302                 /* Whatever the command wish is, we honor the call() flags. */
2303                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2304                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2305                 if (target)
2306                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2307             }
2308         }
2309         redisOpArrayFree(&server.also_propagate);
2310     }
2311     server.stat_numcommands++;
2312 }
2313 
2314 /* If this function gets called we already read a whole
2315  * command, arguments are in the client argv/argc fields.
2316  * processCommand() execute the command or prepare the
2317  * server for a bulk read from the client.
2318  *
2319  * If C_OK is returned the client is still alive and valid and
2320  * other operations can be performed by the caller. Otherwise
2321  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
2322 int processCommand(client *c) {
2323     /* The QUIT command is handled separately. Normal command procs will
2324      * go through checking for replication and QUIT will cause trouble
2325      * when FORCE_REPLICATION is enabled and would be implemented in
2326      * a regular command proc. */
2327     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2328         addReply(c,shared.ok);
2329         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2330         return C_ERR;
2331     }
2332 
2333     /* Now lookup the command and check ASAP about trivial error conditions
2334      * such as wrong arity, bad command name and so forth. */
2335     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2336     if (!c->cmd) {
2337         flagTransaction(c);
2338         addReplyErrorFormat(c,"unknown command '%s'",
2339             (char*)c->argv[0]->ptr);
2340         return C_OK;
2341     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2342                (c->argc < -c->cmd->arity)) {
2343         flagTransaction(c);
2344         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2345             c->cmd->name);
2346         return C_OK;
2347     }
2348 
2349     /* Check if the user is authenticated */
2350     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2351     {
2352         flagTransaction(c);
2353         addReply(c,shared.noautherr);
2354         return C_OK;
2355     }
2356 
2357     /* If cluster is enabled perform the cluster redirection here.
2358      * However we don't perform the redirection if:
2359      * 1) The sender of this command is our master.
2360      * 2) The command has no key arguments. */
2361     if (server.cluster_enabled &&
2362         !(c->flags & CLIENT_MASTER) &&
2363         !(c->flags & CLIENT_LUA &&
2364           server.lua_caller->flags & CLIENT_MASTER) &&
2365         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2366           c->cmd->proc != execCommand))
2367     {
2368         int hashslot;
2369         int error_code;
2370         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2371                                         &hashslot,&error_code);
2372         if (n == NULL || n != server.cluster->myself) {
2373             if (c->cmd->proc == execCommand) {
2374                 discardTransaction(c);
2375             } else {
2376                 flagTransaction(c);
2377             }
2378             clusterRedirectClient(c,n,hashslot,error_code);
2379             return C_OK;
2380         }
2381     }
2382 
2383     /* Handle the maxmemory directive.
2384      *
2385      * First we try to free some memory if possible (if there are volatile
2386      * keys in the dataset). If there are not the only thing we can do
2387      * is returning an error. */
2388     if (server.maxmemory) {
2389         int retval = freeMemoryIfNeeded();
2390         /* freeMemoryIfNeeded may flush slave output buffers. This may result
2391          * into a slave, that may be the active client, to be freed. */
2392         if (server.current_client == NULL) return C_ERR;
2393 
2394         /* It was impossible to free enough memory, and the command the client
2395          * is trying to execute is denied during OOM conditions? Error. */
2396         if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
2397             flagTransaction(c);
2398             addReply(c, shared.oomerr);
2399             return C_OK;
2400         }
2401     }
2402 
2403     /* Don't accept write commands if there are problems persisting on disk
2404      * and if this is a master instance. */
2405     if (((server.stop_writes_on_bgsave_err &&
2406           server.saveparamslen > 0 &&
2407           server.lastbgsave_status == C_ERR) ||
2408           server.aof_last_write_status == C_ERR) &&
2409         server.masterhost == NULL &&
2410         (c->cmd->flags & CMD_WRITE ||
2411          c->cmd->proc == pingCommand))
2412     {
2413         flagTransaction(c);
2414         if (server.aof_last_write_status == C_OK)
2415             addReply(c, shared.bgsaveerr);
2416         else
2417             addReplySds(c,
2418                 sdscatprintf(sdsempty(),
2419                 "-MISCONF Errors writing to the AOF file: %s\r\n",
2420                 strerror(server.aof_last_write_errno)));
2421         return C_OK;
2422     }
2423 
2424     /* Don't accept write commands if there are not enough good slaves and
2425      * user configured the min-slaves-to-write option. */
2426     if (server.masterhost == NULL &&
2427         server.repl_min_slaves_to_write &&
2428         server.repl_min_slaves_max_lag &&
2429         c->cmd->flags & CMD_WRITE &&
2430         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2431     {
2432         flagTransaction(c);
2433         addReply(c, shared.noreplicaserr);
2434         return C_OK;
2435     }
2436 
2437     /* Don't accept write commands if this is a read only slave. But
2438      * accept write commands if this is our master. */
2439     if (server.masterhost && server.repl_slave_ro &&
2440         !(c->flags & CLIENT_MASTER) &&
2441         c->cmd->flags & CMD_WRITE)
2442     {
2443         addReply(c, shared.roslaveerr);
2444         return C_OK;
2445     }
2446 
2447     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2448     if (c->flags & CLIENT_PUBSUB &&
2449         c->cmd->proc != pingCommand &&
2450         c->cmd->proc != subscribeCommand &&
2451         c->cmd->proc != unsubscribeCommand &&
2452         c->cmd->proc != psubscribeCommand &&
2453         c->cmd->proc != punsubscribeCommand) {
2454         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2455         return C_OK;
2456     }
2457 
2458     /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
2459      * we are a slave with a broken link with master. */
2460     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2461         server.repl_serve_stale_data == 0 &&
2462         !(c->cmd->flags & CMD_STALE))
2463     {
2464         flagTransaction(c);
2465         addReply(c, shared.masterdownerr);
2466         return C_OK;
2467     }
2468 
2469     /* Loading DB? Return an error if the command has not the
2470      * CMD_LOADING flag. */
2471     if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2472         addReply(c, shared.loadingerr);
2473         return C_OK;
2474     }
2475 
2476     /* Lua script too slow? Only allow a limited number of commands. */
2477     if (server.lua_timedout &&
2478           c->cmd->proc != authCommand &&
2479           c->cmd->proc != replconfCommand &&
2480         !(c->cmd->proc == shutdownCommand &&
2481           c->argc == 2 &&
2482           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2483         !(c->cmd->proc == scriptCommand &&
2484           c->argc == 2 &&
2485           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2486     {
2487         flagTransaction(c);
2488         addReply(c, shared.slowscripterr);
2489         return C_OK;
2490     }
2491 
2492     /* Exec the command */
2493     if (c->flags & CLIENT_MULTI &&
2494         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2495         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2496     {
2497         queueMultiCommand(c);
2498         addReply(c,shared.queued);
2499     } else {
2500         call(c,CMD_CALL_FULL);
2501         c->woff = server.master_repl_offset;
2502         if (listLength(server.ready_keys))
2503             handleClientsBlockedOnLists();
2504     }
2505     return C_OK;
2506 }
2507 
2508 /*================================== Shutdown =============================== */
2509 
2510 /* Close listening sockets. Also unlink the unix domain socket if
2511  * unlink_unix_socket is non-zero. */
2512 void closeListeningSockets(int unlink_unix_socket) {
2513     int j;
2514 
2515     for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2516     if (server.sofd != -1) close(server.sofd);
2517     if (server.cluster_enabled)
2518         for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2519     if (unlink_unix_socket && server.unixsocket) {
2520         serverLog(LL_NOTICE,"Removing the unix socket file.");
2521         unlink(server.unixsocket); /* don't care if this fails */
2522     }
2523 }
2524 
2525 int prepareForShutdown(int flags) {
2526     int save = flags & SHUTDOWN_SAVE;
2527     int nosave = flags & SHUTDOWN_NOSAVE;
2528 
2529     serverLog(LL_WARNING,"User requested shutdown...");
2530 
2531     /* Kill all the Lua debugger forked sessions. */
2532     ldbKillForkedSessions();
2533 
2534     /* Kill the saving child if there is a background saving in progress.
2535        We want to avoid race conditions, for instance our saving child may
2536        overwrite the synchronous saving did by SHUTDOWN. */
2537     if (server.rdb_child_pid != -1) {
2538         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2539         kill(server.rdb_child_pid,SIGUSR1);
2540         rdbRemoveTempFile(server.rdb_child_pid);
2541     }
2542 
2543     if (server.aof_state != AOF_OFF) {
2544         /* Kill the AOF saving child as the AOF we already have may be longer
2545          * but contains the full dataset anyway. */
2546         if (server.aof_child_pid != -1) {
2547             /* If we have AOF enabled but haven't written the AOF yet, don't
2548              * shutdown or else the dataset will be lost. */
2549             if (server.aof_state == AOF_WAIT_REWRITE) {
2550                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2551                 return C_ERR;
2552             }
2553             serverLog(LL_WARNING,
2554                 "There is a child rewriting the AOF. Killing it!");
2555             kill(server.aof_child_pid,SIGUSR1);
2556         }
2557         /* Append only file: fsync() the AOF and exit */
2558         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2559         aof_fsync(server.aof_fd);
2560     }
2561 
2562     /* Create a new RDB file before exiting. */
2563     if ((server.saveparamslen > 0 && !nosave) || save) {
2564         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2565         /* Snapshotting. Perform a SYNC SAVE and exit */
2566         if (rdbSave(server.rdb_filename) != C_OK) {
2567             /* Ooops.. error saving! The best we can do is to continue
2568              * operating. Note that if there was a background saving process,
2569              * in the next cron() Redis will be notified that the background
2570              * saving aborted, handling special stuff like slaves pending for
2571              * synchronization... */
2572             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2573             return C_ERR;
2574         }
2575     }
2576 
2577     /* Remove the pid file if possible and needed. */
2578     if (server.daemonize || server.pidfile) {
2579         serverLog(LL_NOTICE,"Removing the pid file.");
2580         unlink(server.pidfile);
2581     }
2582 
2583     /* Best effort flush of slave output buffers, so that we hopefully
2584      * send them pending writes. */
2585     flushSlavesOutputBuffers();
2586 
2587     /* Close the listening sockets. Apparently this allows faster restarts. */
2588     closeListeningSockets(1);
2589     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2590         server.sentinel_mode ? "Sentinel" : "Redis");
2591     return C_OK;
2592 }
2593 
2594 /*================================== Commands =============================== */
2595 
2596 /* Return zero if strings are the same, non-zero if they are not.
2597  * The comparison is performed in a way that prevents an attacker to obtain
2598  * information about the nature of the strings just monitoring the execution
2599  * time of the function.
2600  *
2601  * Note that limiting the comparison length to strings up to 512 bytes we
2602  * can avoid leaking any information about the password length and any
2603  * possible branch misprediction related leak.
2604  */
2605 int time_independent_strcmp(char *a, char *b) {
2606     char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2607     /* The above two strlen perform len(a) + len(b) operations where either
2608      * a or b are fixed (our password) length, and the difference is only
2609      * relative to the length of the user provided string, so no information
2610      * leak is possible in the following two lines of code. */
2611     unsigned int alen = strlen(a);
2612     unsigned int blen = strlen(b);
2613     unsigned int j;
2614     int diff = 0;
2615 
2616     /* We can't compare strings longer than our static buffers.
2617      * Note that this will never pass the first test in practical circumstances
2618      * so there is no info leak. */
2619     if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2620 
2621     memset(bufa,0,sizeof(bufa));        /* Constant time. */
2622     memset(bufb,0,sizeof(bufb));        /* Constant time. */
2623     /* Again the time of the following two copies is proportional to
2624      * len(a) + len(b) so no info is leaked. */
2625     memcpy(bufa,a,alen);
2626     memcpy(bufb,b,blen);
2627 
2628     /* Always compare all the chars in the two buffers without
2629      * conditional expressions. */
2630     for (j = 0; j < sizeof(bufa); j++) {
2631         diff |= (bufa[j] ^ bufb[j]);
2632     }
2633     /* Length must be equal as well. */
2634     diff |= alen ^ blen;
2635     return diff; /* If zero strings are the same. */
2636 }
2637 
2638 void authCommand(client *c) {
2639     if (!server.requirepass) {
2640         addReplyError(c,"Client sent AUTH, but no password is set");
2641     } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2642       c->authenticated = 1;
2643       addReply(c,shared.ok);
2644     } else {
2645       c->authenticated = 0;
2646       addReplyError(c,"invalid password");
2647     }
2648 }
2649 
2650 /* The PING command. It works in a different way if the client is in
2651  * in Pub/Sub mode. */
2652 void pingCommand(client *c) {
2653     /* The command takes zero or one arguments. */
2654     if (c->argc > 2) {
2655         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2656             c->cmd->name);
2657         return;
2658     }
2659 
2660     if (c->flags & CLIENT_PUBSUB) {
2661         addReply(c,shared.mbulkhdr[2]);
2662         addReplyBulkCBuffer(c,"pong",4);
2663         if (c->argc == 1)
2664             addReplyBulkCBuffer(c,"",0);
2665         else
2666             addReplyBulk(c,c->argv[1]);
2667     } else {
2668         if (c->argc == 1)
2669             addReply(c,shared.pong);
2670         else
2671             addReplyBulk(c,c->argv[1]);
2672     }
2673 }
2674 
2675 void echoCommand(client *c) {
2676     addReplyBulk(c,c->argv[1]);
2677 }
2678 
2679 void timeCommand(client *c) {
2680     struct timeval tv;
2681 
2682     /* gettimeofday() can only fail if &tv is a bad address so we
2683      * don't check for errors. */
2684     gettimeofday(&tv,NULL);
2685     addReplyMultiBulkLen(c,2);
2686     addReplyBulkLongLong(c,tv.tv_sec);
2687     addReplyBulkLongLong(c,tv.tv_usec);
2688 }
2689 
2690 /* Helper function for addReplyCommand() to output flags. */
2691 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2692     if (cmd->flags & f) {
2693         addReplyStatus(c, reply);
2694         return 1;
2695     }
2696     return 0;
2697 }
2698 
2699 /* Output the representation of a Redis command. Used by the COMMAND command. */
2700 void addReplyCommand(client *c, struct redisCommand *cmd) {
2701     if (!cmd) {
2702         addReply(c, shared.nullbulk);
2703     } else {
2704         /* We are adding: command name, arg count, flags, first, last, offset */
2705         addReplyMultiBulkLen(c, 6);
2706         addReplyBulkCString(c, cmd->name);
2707         addReplyLongLong(c, cmd->arity);
2708 
2709         int flagcount = 0;
2710         void *flaglen = addDeferredMultiBulkLength(c);
2711         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
2712         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
2713         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
2714         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
2715         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
2716         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
2717         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
2718         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
2719         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
2720         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
2721         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
2722         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
2723         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
2724         if (cmd->getkeys_proc) {
2725             addReplyStatus(c, "movablekeys");
2726             flagcount += 1;
2727         }
2728         setDeferredMultiBulkLength(c, flaglen, flagcount);
2729 
2730         addReplyLongLong(c, cmd->firstkey);
2731         addReplyLongLong(c, cmd->lastkey);
2732         addReplyLongLong(c, cmd->keystep);
2733     }
2734 }
2735 
2736 /* COMMAND <subcommand> <args> */
2737 void commandCommand(client *c) {
2738     dictIterator *di;
2739     dictEntry *de;
2740 
2741     if (c->argc == 1) {
2742         addReplyMultiBulkLen(c, dictSize(server.commands));
2743         di = dictGetIterator(server.commands);
2744         while ((de = dictNext(di)) != NULL) {
2745             addReplyCommand(c, dictGetVal(de));
2746         }
2747         dictReleaseIterator(di);
2748     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
2749         int i;
2750         addReplyMultiBulkLen(c, c->argc-2);
2751         for (i = 2; i < c->argc; i++) {
2752             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
2753         }
2754     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
2755         addReplyLongLong(c, dictSize(server.commands));
2756     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
2757         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
2758         int *keys, numkeys, j;
2759 
2760         if (!cmd) {
2761             addReplyErrorFormat(c,"Invalid command specified");
2762             return;
2763         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
2764                    ((c->argc-2) < -cmd->arity))
2765         {
2766             addReplyError(c,"Invalid number of arguments specified for command");
2767             return;
2768         }
2769 
2770         keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
2771         addReplyMultiBulkLen(c,numkeys);
2772         for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
2773         getKeysFreeResult(keys);
2774     } else {
2775         addReplyError(c, "Unknown subcommand or wrong number of arguments.");
2776         return;
2777     }
2778 }
2779 
2780 /* Convert an amount of bytes into a human readable string in the form
2781  * of 100B, 2G, 100M, 4K, and so forth. */
2782 void bytesToHuman(char *s, unsigned long long n) {
2783     double d;
2784 
2785     if (n < 1024) {
2786         /* Bytes */
2787         sprintf(s,"%lluB",n);
2788         return;
2789     } else if (n < (1024*1024)) {
2790         d = (double)n/(1024);
2791         sprintf(s,"%.2fK",d);
2792     } else if (n < (1024LL*1024*1024)) {
2793         d = (double)n/(1024*1024);
2794         sprintf(s,"%.2fM",d);
2795     } else if (n < (1024LL*1024*1024*1024)) {
2796         d = (double)n/(1024LL*1024*1024);
2797         sprintf(s,"%.2fG",d);
2798     } else if (n < (1024LL*1024*1024*1024*1024)) {
2799         d = (double)n/(1024LL*1024*1024*1024);
2800         sprintf(s,"%.2fT",d);
2801     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
2802         d = (double)n/(1024LL*1024*1024*1024*1024);
2803         sprintf(s,"%.2fP",d);
2804     } else {
2805         /* Let's hope we never need this */
2806         sprintf(s,"%lluB",n);
2807     }
2808 }
2809 
2810 /* Create the string returned by the INFO command. This is decoupled
2811  * by the INFO command itself as we need to report the same information
2812  * on memory corruption problems. */
2813 sds genRedisInfoString(char *section) {
2814     sds info = sdsempty();
2815     time_t uptime = server.unixtime-server.stat_starttime;
2816     int j, numcommands;
2817     struct rusage self_ru, c_ru;
2818     unsigned long lol, bib;
2819     int allsections = 0, defsections = 0;
2820     int sections = 0;
2821 
2822     if (section == NULL) section = "default";
2823     allsections = strcasecmp(section,"all") == 0;
2824     defsections = strcasecmp(section,"default") == 0;
2825 
2826     getrusage(RUSAGE_SELF, &self_ru);
2827     getrusage(RUSAGE_CHILDREN, &c_ru);
2828     getClientsMaxBuffers(&lol,&bib);
2829 
2830     /* Server */
2831     if (allsections || defsections || !strcasecmp(section,"server")) {
2832         static int call_uname = 1;
2833         static struct utsname name;
2834         char *mode;
2835 
2836         if (server.cluster_enabled) mode = "cluster";
2837         else if (server.sentinel_mode) mode = "sentinel";
2838         else mode = "standalone";
2839 
2840         if (sections++) info = sdscat(info,"\r\n");
2841 
2842         if (call_uname) {
2843             /* Uname can be slow and is always the same output. Cache it. */
2844             uname(&name);
2845             call_uname = 0;
2846         }
2847 
2848         info = sdscatprintf(info,
2849             "# Server\r\n"
2850             "redis_version:%s\r\n"
2851             "redis_git_sha1:%s\r\n"
2852             "redis_git_dirty:%d\r\n"
2853             "redis_build_id:%llx\r\n"
2854             "redis_mode:%s\r\n"
2855             "os:%s %s %s\r\n"
2856             "arch_bits:%d\r\n"
2857             "multiplexing_api:%s\r\n"
2858             "gcc_version:%d.%d.%d\r\n"
2859             "process_id:%ld\r\n"
2860             "run_id:%s\r\n"
2861             "tcp_port:%d\r\n"
2862             "uptime_in_seconds:%jd\r\n"
2863             "uptime_in_days:%jd\r\n"
2864             "hz:%d\r\n"
2865             "lru_clock:%ld\r\n"
2866             "executable:%s\r\n"
2867             "config_file:%s\r\n",
2868             REDIS_VERSION,
2869             redisGitSHA1(),
2870             strtol(redisGitDirty(),NULL,10) > 0,
2871             (unsigned long long) redisBuildId(),
2872             mode,
2873             name.sysname, name.release, name.machine,
2874             server.arch_bits,
2875             aeGetApiName(),
2876 #ifdef __GNUC__
2877             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
2878 #else
2879             0,0,0,
2880 #endif
2881             (long) getpid(),
2882             server.runid,
2883             server.port,
2884             (intmax_t)uptime,
2885             (intmax_t)(uptime/(3600*24)),
2886             server.hz,
2887             (unsigned long) server.lruclock,
2888             server.executable ? server.executable : "",
2889             server.configfile ? server.configfile : "");
2890     }
2891 
2892     /* Clients */
2893     if (allsections || defsections || !strcasecmp(section,"clients")) {
2894         if (sections++) info = sdscat(info,"\r\n");
2895         info = sdscatprintf(info,
2896             "# Clients\r\n"
2897             "connected_clients:%lu\r\n"
2898             "client_longest_output_list:%lu\r\n"
2899             "client_biggest_input_buf:%lu\r\n"
2900             "blocked_clients:%d\r\n",
2901             listLength(server.clients)-listLength(server.slaves),
2902             lol, bib,
2903             server.bpop_blocked_clients);
2904     }
2905 
2906     /* Memory */
2907     if (allsections || defsections || !strcasecmp(section,"memory")) {
2908         char hmem[64];
2909         char peak_hmem[64];
2910         char total_system_hmem[64];
2911         char used_memory_lua_hmem[64];
2912         char used_memory_rss_hmem[64];
2913         char maxmemory_hmem[64];
2914         size_t zmalloc_used = zmalloc_used_memory();
2915         size_t total_system_mem = server.system_memory_size;
2916         const char *evict_policy = evictPolicyToString();
2917         long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
2918 
2919         /* Peak memory is updated from time to time by serverCron() so it
2920          * may happen that the instantaneous value is slightly bigger than
2921          * the peak value. This may confuse users, so we update the peak
2922          * if found smaller than the current memory usage. */
2923         if (zmalloc_used > server.stat_peak_memory)
2924             server.stat_peak_memory = zmalloc_used;
2925 
2926         bytesToHuman(hmem,zmalloc_used);
2927         bytesToHuman(peak_hmem,server.stat_peak_memory);
2928         bytesToHuman(total_system_hmem,total_system_mem);
2929         bytesToHuman(used_memory_lua_hmem,memory_lua);
2930         bytesToHuman(used_memory_rss_hmem,server.resident_set_size);
2931         bytesToHuman(maxmemory_hmem,server.maxmemory);
2932 
2933         if (sections++) info = sdscat(info,"\r\n");
2934         info = sdscatprintf(info,
2935             "# Memory\r\n"
2936             "used_memory:%zu\r\n"
2937             "used_memory_human:%s\r\n"
2938             "used_memory_rss:%zu\r\n"
2939             "used_memory_rss_human:%s\r\n"
2940             "used_memory_peak:%zu\r\n"
2941             "used_memory_peak_human:%s\r\n"
2942             "total_system_memory:%lu\r\n"
2943             "total_system_memory_human:%s\r\n"
2944             "used_memory_lua:%lld\r\n"
2945             "used_memory_lua_human:%s\r\n"
2946             "maxmemory:%lld\r\n"
2947             "maxmemory_human:%s\r\n"
2948             "maxmemory_policy:%s\r\n"
2949             "mem_fragmentation_ratio:%.2f\r\n"
2950             "mem_allocator:%s\r\n",
2951             zmalloc_used,
2952             hmem,
2953             server.resident_set_size,
2954             used_memory_rss_hmem,
2955             server.stat_peak_memory,
2956             peak_hmem,
2957             (unsigned long)total_system_mem,
2958             total_system_hmem,
2959             memory_lua,
2960             used_memory_lua_hmem,
2961             server.maxmemory,
2962             maxmemory_hmem,
2963             evict_policy,
2964             zmalloc_get_fragmentation_ratio(server.resident_set_size),
2965             ZMALLOC_LIB
2966             );
2967     }
2968 
2969     /* Persistence */
2970     if (allsections || defsections || !strcasecmp(section,"persistence")) {
2971         if (sections++) info = sdscat(info,"\r\n");
2972         info = sdscatprintf(info,
2973             "# Persistence\r\n"
2974             "loading:%d\r\n"
2975             "rdb_changes_since_last_save:%lld\r\n"
2976             "rdb_bgsave_in_progress:%d\r\n"
2977             "rdb_last_save_time:%jd\r\n"
2978             "rdb_last_bgsave_status:%s\r\n"
2979             "rdb_last_bgsave_time_sec:%jd\r\n"
2980             "rdb_current_bgsave_time_sec:%jd\r\n"
2981             "aof_enabled:%d\r\n"
2982             "aof_rewrite_in_progress:%d\r\n"
2983             "aof_rewrite_scheduled:%d\r\n"
2984             "aof_last_rewrite_time_sec:%jd\r\n"
2985             "aof_current_rewrite_time_sec:%jd\r\n"
2986             "aof_last_bgrewrite_status:%s\r\n"
2987             "aof_last_write_status:%s\r\n",
2988             server.loading,
2989             server.dirty,
2990             server.rdb_child_pid != -1,
2991             (intmax_t)server.lastsave,
2992             (server.lastbgsave_status == C_OK) ? "ok" : "err",
2993             (intmax_t)server.rdb_save_time_last,
2994             (intmax_t)((server.rdb_child_pid == -1) ?
2995                 -1 : time(NULL)-server.rdb_save_time_start),
2996             server.aof_state != AOF_OFF,
2997             server.aof_child_pid != -1,
2998             server.aof_rewrite_scheduled,
2999             (intmax_t)server.aof_rewrite_time_last,
3000             (intmax_t)((server.aof_child_pid == -1) ?
3001                 -1 : time(NULL)-server.aof_rewrite_time_start),
3002             (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
3003             (server.aof_last_write_status == C_OK) ? "ok" : "err");
3004 
3005         if (server.aof_state != AOF_OFF) {
3006             info = sdscatprintf(info,
3007                 "aof_current_size:%lld\r\n"
3008                 "aof_base_size:%lld\r\n"
3009                 "aof_pending_rewrite:%d\r\n"
3010                 "aof_buffer_length:%zu\r\n"
3011                 "aof_rewrite_buffer_length:%lu\r\n"
3012                 "aof_pending_bio_fsync:%llu\r\n"
3013                 "aof_delayed_fsync:%lu\r\n",
3014                 (long long) server.aof_current_size,
3015                 (long long) server.aof_rewrite_base_size,
3016                 server.aof_rewrite_scheduled,
3017                 sdslen(server.aof_buf),
3018                 aofRewriteBufferSize(),
3019                 bioPendingJobsOfType(BIO_AOF_FSYNC),
3020                 server.aof_delayed_fsync);
3021         }
3022 
3023         if (server.loading) {
3024             double perc;
3025             time_t eta, elapsed;
3026             off_t remaining_bytes = server.loading_total_bytes-
3027                                     server.loading_loaded_bytes;
3028 
3029             perc = ((double)server.loading_loaded_bytes /
3030                    (server.loading_total_bytes+1)) * 100;
3031 
3032             elapsed = time(NULL)-server.loading_start_time;
3033             if (elapsed == 0) {
3034                 eta = 1; /* A fake 1 second figure if we don't have
3035                             enough info */
3036             } else {
3037                 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3038             }
3039 
3040             info = sdscatprintf(info,
3041                 "loading_start_time:%jd\r\n"
3042                 "loading_total_bytes:%llu\r\n"
3043                 "loading_loaded_bytes:%llu\r\n"
3044                 "loading_loaded_perc:%.2f\r\n"
3045                 "loading_eta_seconds:%jd\r\n",
3046                 (intmax_t) server.loading_start_time,
3047                 (unsigned long long) server.loading_total_bytes,
3048                 (unsigned long long) server.loading_loaded_bytes,
3049                 perc,
3050                 (intmax_t)eta
3051             );
3052         }
3053     }
3054 
3055     /* Stats */
3056     if (allsections || defsections || !strcasecmp(section,"stats")) {
3057         if (sections++) info = sdscat(info,"\r\n");
3058         info = sdscatprintf(info,
3059             "# Stats\r\n"
3060             "total_connections_received:%lld\r\n"
3061             "total_commands_processed:%lld\r\n"
3062             "instantaneous_ops_per_sec:%lld\r\n"
3063             "total_net_input_bytes:%lld\r\n"
3064             "total_net_output_bytes:%lld\r\n"
3065             "instantaneous_input_kbps:%.2f\r\n"
3066             "instantaneous_output_kbps:%.2f\r\n"
3067             "rejected_connections:%lld\r\n"
3068             "sync_full:%lld\r\n"
3069             "sync_partial_ok:%lld\r\n"
3070             "sync_partial_err:%lld\r\n"
3071             "expired_keys:%lld\r\n"
3072             "evicted_keys:%lld\r\n"
3073             "keyspace_hits:%lld\r\n"
3074             "keyspace_misses:%lld\r\n"
3075             "pubsub_channels:%ld\r\n"
3076             "pubsub_patterns:%lu\r\n"
3077             "latest_fork_usec:%lld\r\n"
3078             "migrate_cached_sockets:%ld\r\n",
3079             server.stat_numconnections,
3080             server.stat_numcommands,
3081             getInstantaneousMetric(STATS_METRIC_COMMAND),
3082             server.stat_net_input_bytes,
3083             server.stat_net_output_bytes,
3084             (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3085             (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3086             server.stat_rejected_conn,
3087             server.stat_sync_full,
3088             server.stat_sync_partial_ok,
3089             server.stat_sync_partial_err,
3090             server.stat_expiredkeys,
3091             server.stat_evictedkeys,
3092             server.stat_keyspace_hits,
3093             server.stat_keyspace_misses,
3094             dictSize(server.pubsub_channels),
3095             listLength(server.pubsub_patterns),
3096             server.stat_fork_time,
3097             dictSize(server.migrate_cached_sockets));
3098     }
3099 
3100     /* Replication */
3101     if (allsections || defsections || !strcasecmp(section,"replication")) {
3102         if (sections++) info = sdscat(info,"\r\n");
3103         info = sdscatprintf(info,
3104             "# Replication\r\n"
3105             "role:%s\r\n",
3106             server.masterhost == NULL ? "master" : "slave");
3107         if (server.masterhost) {
3108             long long slave_repl_offset = 1;
3109 
3110             if (server.master)
3111                 slave_repl_offset = server.master->reploff;
3112             else if (server.cached_master)
3113                 slave_repl_offset = server.cached_master->reploff;
3114 
3115             info = sdscatprintf(info,
3116                 "master_host:%s\r\n"
3117                 "master_port:%d\r\n"
3118                 "master_link_status:%s\r\n"
3119                 "master_last_io_seconds_ago:%d\r\n"
3120                 "master_sync_in_progress:%d\r\n"
3121                 "slave_repl_offset:%lld\r\n"
3122                 ,server.masterhost,
3123                 server.masterport,
3124                 (server.repl_state == REPL_STATE_CONNECTED) ?
3125                     "up" : "down",
3126                 server.master ?
3127                 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3128                 server.repl_state == REPL_STATE_TRANSFER,
3129                 slave_repl_offset
3130             );
3131 
3132             if (server.repl_state == REPL_STATE_TRANSFER) {
3133                 info = sdscatprintf(info,
3134                     "master_sync_left_bytes:%lld\r\n"
3135                     "master_sync_last_io_seconds_ago:%d\r\n"
3136                     , (long long)
3137                         (server.repl_transfer_size - server.repl_transfer_read),
3138                     (int)(server.unixtime-server.repl_transfer_lastio)
3139                 );
3140             }
3141 
3142             if (server.repl_state != REPL_STATE_CONNECTED) {
3143                 info = sdscatprintf(info,
3144                     "master_link_down_since_seconds:%jd\r\n",
3145                     (intmax_t)server.unixtime-server.repl_down_since);
3146             }
3147             info = sdscatprintf(info,
3148                 "slave_priority:%d\r\n"
3149                 "slave_read_only:%d\r\n",
3150                 server.slave_priority,
3151                 server.repl_slave_ro);
3152         }
3153 
3154         info = sdscatprintf(info,
3155             "connected_slaves:%lu\r\n",
3156             listLength(server.slaves));
3157 
3158         /* If min-slaves-to-write is active, write the number of slaves
3159          * currently considered 'good'. */
3160         if (server.repl_min_slaves_to_write &&
3161             server.repl_min_slaves_max_lag) {
3162             info = sdscatprintf(info,
3163                 "min_slaves_good_slaves:%d\r\n",
3164                 server.repl_good_slaves_count);
3165         }
3166 
3167         if (listLength(server.slaves)) {
3168             int slaveid = 0;
3169             listNode *ln;
3170             listIter li;
3171 
3172             listRewind(server.slaves,&li);
3173             while((ln = listNext(&li))) {
3174                 client *slave = listNodeValue(ln);
3175                 char *state = NULL;
3176                 char ip[NET_IP_STR_LEN];
3177                 int port;
3178                 long lag = 0;
3179 
3180                 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1) continue;
3181                 switch(slave->replstate) {
3182                 case SLAVE_STATE_WAIT_BGSAVE_START:
3183                 case SLAVE_STATE_WAIT_BGSAVE_END:
3184                     state = "wait_bgsave";
3185                     break;
3186                 case SLAVE_STATE_SEND_BULK:
3187                     state = "send_bulk";
3188                     break;
3189                 case SLAVE_STATE_ONLINE:
3190                     state = "online";
3191                     break;
3192                 }
3193                 if (state == NULL) continue;
3194                 if (slave->replstate == SLAVE_STATE_ONLINE)
3195                     lag = time(NULL) - slave->repl_ack_time;
3196 
3197                 info = sdscatprintf(info,
3198                     "slave%d:ip=%s,port=%d,state=%s,"
3199                     "offset=%lld,lag=%ld\r\n",
3200                     slaveid,ip,slave->slave_listening_port,state,
3201                     slave->repl_ack_off, lag);
3202                 slaveid++;
3203             }
3204         }
3205         info = sdscatprintf(info,
3206             "master_repl_offset:%lld\r\n"
3207             "repl_backlog_active:%d\r\n"
3208             "repl_backlog_size:%lld\r\n"
3209             "repl_backlog_first_byte_offset:%lld\r\n"
3210             "repl_backlog_histlen:%lld\r\n",
3211             server.master_repl_offset,
3212             server.repl_backlog != NULL,
3213             server.repl_backlog_size,
3214             server.repl_backlog_off,
3215             server.repl_backlog_histlen);
3216     }
3217 
3218     /* CPU */
3219     if (allsections || defsections || !strcasecmp(section,"cpu")) {
3220         if (sections++) info = sdscat(info,"\r\n");
3221         info = sdscatprintf(info,
3222         "# CPU\r\n"
3223         "used_cpu_sys:%.2f\r\n"
3224         "used_cpu_user:%.2f\r\n"
3225         "used_cpu_sys_children:%.2f\r\n"
3226         "used_cpu_user_children:%.2f\r\n",
3227         (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
3228         (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
3229         (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
3230         (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000);
3231     }
3232 
3233     /* cmdtime */
3234     if (allsections || !strcasecmp(section,"commandstats")) {
3235         if (sections++) info = sdscat(info,"\r\n");
3236         info = sdscatprintf(info, "# Commandstats\r\n");
3237         numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
3238         for (j = 0; j < numcommands; j++) {
3239             struct redisCommand *c = redisCommandTable+j;
3240 
3241             if (!c->calls) continue;
3242             info = sdscatprintf(info,
3243                 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3244                 c->name, c->calls, c->microseconds,
3245                 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3246         }
3247     }
3248 
3249     /* Cluster */
3250     if (allsections || defsections || !strcasecmp(section,"cluster")) {
3251         if (sections++) info = sdscat(info,"\r\n");
3252         info = sdscatprintf(info,
3253         "# Cluster\r\n"
3254         "cluster_enabled:%d\r\n",
3255         server.cluster_enabled);
3256     }
3257 
3258     /* Key space */
3259     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3260         if (sections++) info = sdscat(info,"\r\n");
3261         info = sdscatprintf(info, "# Keyspace\r\n");
3262         for (j = 0; j < server.dbnum; j++) {
3263             long long keys, vkeys;
3264 
3265             keys = dictSize(server.db[j].dict);
3266             vkeys = dictSize(server.db[j].expires);
3267             if (keys || vkeys) {
3268                 info = sdscatprintf(info,
3269                     "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3270                     j, keys, vkeys, server.db[j].avg_ttl);
3271             }
3272         }
3273     }
3274     return info;
3275 }
3276 
3277 void infoCommand(client *c) {
3278     char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3279 
3280     if (c->argc > 2) {
3281         addReply(c,shared.syntaxerr);
3282         return;
3283     }
3284     addReplyBulkSds(c, genRedisInfoString(section));
3285 }
3286 
3287 void monitorCommand(client *c) {
3288     /* ignore MONITOR if already slave or in monitor mode */
3289     if (c->flags & CLIENT_SLAVE) return;
3290 
3291     c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3292     listAddNodeTail(server.monitors,c);
3293     addReply(c,shared.ok);
3294 }
3295 
3296 /* ============================ Maxmemory directive  ======================== */
3297 
3298 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config
3299  * file to limit the max memory used by the server, before processing a
3300  * command.
3301  *
3302  * The goal of the function is to free enough memory to keep Redis under the
3303  * configured memory limit.
3304  *
3305  * The function starts calculating how many bytes should be freed to keep
3306  * Redis under the limit, and enters a loop selecting the best keys to
3307  * evict accordingly to the configured policy.
3308  *
3309  * If all the bytes needed to return back under the limit were freed the
3310  * function returns C_OK, otherwise C_ERR is returned, and the caller
3311  * should block the execution of commands that will result in more memory
3312  * used by the server.
3313  *
3314  * ------------------------------------------------------------------------
3315  *
3316  * LRU approximation algorithm
3317  *
3318  * Redis uses an approximation of the LRU algorithm that runs in constant
3319  * memory. Every time there is a key to expire, we sample N keys (with
3320  * N very small, usually in around 5) to populate a pool of best keys to
3321  * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE).
3322  *
3323  * The N keys sampled are added in the pool of good keys to expire (the one
3324  * with an old access time) if they are better than one of the current keys
3325  * in the pool.
3326  *
3327  * After the pool is populated, the best key we have in the pool is expired.
3328  * However note that we don't remove keys from the pool when they are deleted
3329  * so the pool may contain keys that no longer exist.
3330  *
3331  * When we try to evict a key, and all the entries in the pool don't exist
3332  * we populate it again. This time we'll be sure that the pool has at least
3333  * one key that can be evicted, if there is at least one key that can be
3334  * evicted in the whole database. */
3335 
3336 /* Create a new eviction pool. */
3337 struct evictionPoolEntry *evictionPoolAlloc(void) {
3338     struct evictionPoolEntry *ep;
3339     int j;
3340 
3341     ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE);
3342     for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) {
3343         ep[j].idle = 0;
3344         ep[j].key = NULL;
3345     }
3346     return ep;
3347 }
3348 
3349 /* This is an helper function for freeMemoryIfNeeded(), it is used in order
3350  * to populate the evictionPool with a few entries every time we want to
3351  * expire a key. Keys with idle time smaller than one of the current
3352  * keys are added. Keys are always added if there are free entries.
3353  *
3354  * We insert keys on place in ascending order, so keys with the smaller
3355  * idle time are on the left, and keys with the higher idle time on the
3356  * right. */
3357 
3358 #define EVICTION_SAMPLES_ARRAY_SIZE 16
3359 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
3360     int j, k, count;
3361     dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
3362     dictEntry **samples;
3363 
3364     /* Try to use a static buffer: this function is a big hit...
3365      * Note: it was actually measured that this helps. */
3366     if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
3367         samples = _samples;
3368     } else {
3369         samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);
3370     }
3371 
3372     count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
3373     for (j = 0; j < count; j++) {
3374         unsigned long long idle;
3375         sds key;
3376         robj *o;
3377         dictEntry *de;
3378 
3379         de = samples[j];
3380         key = dictGetKey(de);
3381         /* If the dictionary we are sampling from is not the main
3382          * dictionary (but the expires one) we need to lookup the key
3383          * again in the key dictionary to obtain the value object. */
3384         if (sampledict != keydict) de = dictFind(keydict, key);
3385         o = dictGetVal(de);
3386         idle = estimateObjectIdleTime(o);
3387 
3388         /* Insert the element inside the pool.
3389          * First, find the first empty bucket or the first populated
3390          * bucket that has an idle time smaller than our idle time. */
3391         k = 0;
3392         while (k < MAXMEMORY_EVICTION_POOL_SIZE &&
3393                pool[k].key &&
3394                pool[k].idle < idle) k++;
3395         if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) {
3396             /* Can't insert if the element is < the worst element we have
3397              * and there are no empty buckets. */
3398             continue;
3399         } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) {
3400             /* Inserting into empty position. No setup needed before insert. */
3401         } else {
3402             /* Inserting in the middle. Now k points to the first element
3403              * greater than the element to insert.  */
3404             if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) {
3405                 /* Free space on the right? Insert at k shifting
3406                  * all the elements from k to end to the right. */
3407                 memmove(pool+k+1,pool+k,
3408                     sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3409             } else {
3410                 /* No free space on right? Insert at k-1 */
3411                 k--;
3412                 /* Shift all elements on the left of k (included) to the
3413                  * left, so we discard the element with smaller idle time. */
3414                 sdsfree(pool[0].key);
3415                 memmove(pool,pool+1,sizeof(pool[0])*k);
3416             }
3417         }
3418         pool[k].key = sdsdup(key);
3419         pool[k].idle = idle;
3420     }
3421     if (samples != _samples) zfree(samples);
3422 }
3423 
3424 int freeMemoryIfNeeded(void) {
3425     size_t mem_used, mem_tofree, mem_freed;
3426     int slaves = listLength(server.slaves);
3427     mstime_t latency, eviction_latency;
3428 
3429     /* Remove the size of slaves output buffers and AOF buffer from the
3430      * count of used memory. */
3431     mem_used = zmalloc_used_memory();
3432     if (slaves) {
3433         listIter li;
3434         listNode *ln;
3435 
3436         listRewind(server.slaves,&li);
3437         while((ln = listNext(&li))) {
3438             client *slave = listNodeValue(ln);
3439             unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
3440             if (obuf_bytes > mem_used)
3441                 mem_used = 0;
3442             else
3443                 mem_used -= obuf_bytes;
3444         }
3445     }
3446     if (server.aof_state != AOF_OFF) {
3447         mem_used -= sdslen(server.aof_buf);
3448         mem_used -= aofRewriteBufferSize();
3449     }
3450 
3451     /* Check if we are over the memory limit. */
3452     if (mem_used <= server.maxmemory) return C_OK;
3453 
3454     if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
3455         return C_ERR; /* We need to free memory, but policy forbids. */
3456 
3457     /* Compute how much memory we need to free. */
3458     mem_tofree = mem_used - server.maxmemory;
3459     mem_freed = 0;
3460     latencyStartMonitor(latency);
3461     while (mem_freed < mem_tofree) {
3462         int j, k, keys_freed = 0;
3463 
3464         for (j = 0; j < server.dbnum; j++) {
3465             long bestval = 0; /* just to prevent warning */
3466             sds bestkey = NULL;
3467             dictEntry *de;
3468             redisDb *db = server.db+j;
3469             dict *dict;
3470 
3471             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3472                 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
3473             {
3474                 dict = server.db[j].dict;
3475             } else {
3476                 dict = server.db[j].expires;
3477             }
3478             if (dictSize(dict) == 0) continue;
3479 
3480             /* volatile-random and allkeys-random policy */
3481             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
3482                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
3483             {
3484                 de = dictGetRandomKey(dict);
3485                 bestkey = dictGetKey(de);
3486             }
3487 
3488             /* volatile-lru and allkeys-lru policy */
3489             else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3490                 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
3491             {
3492                 struct evictionPoolEntry *pool = db->eviction_pool;
3493 
3494                 while(bestkey == NULL) {
3495                     evictionPoolPopulate(dict, db->dict, db->eviction_pool);
3496                     /* Go backward from best to worst element to evict. */
3497                     for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
3498                         if (pool[k].key == NULL) continue;
3499                         de = dictFind(dict,pool[k].key);
3500 
3501                         /* Remove the entry from the pool. */
3502                         sdsfree(pool[k].key);
3503                         /* Shift all elements on its right to left. */
3504                         memmove(pool+k,pool+k+1,
3505                             sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3506                         /* Clear the element on the right which is empty
3507                          * since we shifted one position to the left.  */
3508                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
3509                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
3510 
3511                         /* If the key exists, is our pick. Otherwise it is
3512                          * a ghost and we need to try the next element. */
3513                         if (de) {
3514                             bestkey = dictGetKey(de);
3515                             break;
3516                         } else {
3517                             /* Ghost... */
3518                             continue;
3519                         }
3520                     }
3521                 }
3522             }
3523 
3524             /* volatile-ttl */
3525             else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
3526                 for (k = 0; k < server.maxmemory_samples; k++) {
3527                     sds thiskey;
3528                     long thisval;
3529 
3530                     de = dictGetRandomKey(dict);
3531                     thiskey = dictGetKey(de);
3532                     thisval = (long) dictGetVal(de);
3533 
3534                     /* Expire sooner (minor expire unix timestamp) is better
3535                      * candidate for deletion */
3536                     if (bestkey == NULL || thisval < bestval) {
3537                         bestkey = thiskey;
3538                         bestval = thisval;
3539                     }
3540                 }
3541             }
3542 
3543             /* Finally remove the selected key. */
3544             if (bestkey) {
3545                 long long delta;
3546 
3547                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
3548                 propagateExpire(db,keyobj);
3549                 /* We compute the amount of memory freed by dbDelete() alone.
3550                  * It is possible that actually the memory needed to propagate
3551                  * the DEL in AOF and replication link is greater than the one
3552                  * we are freeing removing the key, but we can't account for
3553                  * that otherwise we would never exit the loop.
3554                  *
3555                  * AOF and Output buffer memory will be freed eventually so
3556                  * we only care about memory used by the key space. */
3557                 delta = (long long) zmalloc_used_memory();
3558                 latencyStartMonitor(eviction_latency);
3559                 dbDelete(db,keyobj);
3560                 latencyEndMonitor(eviction_latency);
3561                 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
3562                 latencyRemoveNestedEvent(latency,eviction_latency);
3563                 delta -= (long long) zmalloc_used_memory();
3564                 mem_freed += delta;
3565                 server.stat_evictedkeys++;
3566                 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
3567                     keyobj, db->id);
3568                 decrRefCount(keyobj);
3569                 keys_freed++;
3570 
3571                 /* When the memory to free starts to be big enough, we may
3572                  * start spending so much time here that is impossible to
3573                  * deliver data to the slaves fast enough, so we force the
3574                  * transmission here inside the loop. */
3575                 if (slaves) flushSlavesOutputBuffers();
3576             }
3577         }
3578         if (!keys_freed) {
3579             latencyEndMonitor(latency);
3580             latencyAddSampleIfNeeded("eviction-cycle",latency);
3581             return C_ERR; /* nothing to free... */
3582         }
3583     }
3584     latencyEndMonitor(latency);
3585     latencyAddSampleIfNeeded("eviction-cycle",latency);
3586     return C_OK;
3587 }
3588 
3589 /* =================================== Main! ================================ */
3590 
3591 #ifdef __linux__
3592 int linuxOvercommitMemoryValue(void) {
3593     FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3594     char buf[64];
3595 
3596     if (!fp) return -1;
3597     if (fgets(buf,64,fp) == NULL) {
3598         fclose(fp);
3599         return -1;
3600     }
3601     fclose(fp);
3602 
3603     return atoi(buf);
3604 }
3605 
3606 void linuxMemoryWarnings(void) {
3607     if (linuxOvercommitMemoryValue() == 0) {
3608         serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3609     }
3610     if (THPIsEnabled()) {
3611         serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3612     }
3613 }
3614 #endif /* __linux__ */
3615 
3616 void createPidFile(void) {
3617     /* If pidfile requested, but no pidfile defined, use
3618      * default pidfile path */
3619     if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3620 
3621     /* Try to write the pid file in a best-effort way. */
3622     FILE *fp = fopen(server.pidfile,"w");
3623     if (fp) {
3624         fprintf(fp,"%d\n",(int)getpid());
3625         fclose(fp);
3626     }
3627 }
3628 
3629 void daemonize(void) {
3630     int fd;
3631 
3632     if (fork() != 0) exit(0); /* parent exits */
3633     setsid(); /* create a new session */
3634 
3635     /* Every output goes to /dev/null. If Redis is daemonized but
3636      * the 'logfile' is set to 'stdout' in the configuration file
3637      * it will not log at all. */
3638     if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3639         dup2(fd, STDIN_FILENO);
3640         dup2(fd, STDOUT_FILENO);
3641         dup2(fd, STDERR_FILENO);
3642         if (fd > STDERR_FILENO) close(fd);
3643     }
3644 }
3645 
3646 void version(void) {
3647     printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3648         REDIS_VERSION,
3649         redisGitSHA1(),
3650         atoi(redisGitDirty()) > 0,
3651         ZMALLOC_LIB,
3652         sizeof(long) == 4 ? 32 : 64,
3653         (unsigned long long) redisBuildId());
3654     exit(0);
3655 }
3656 
3657 void usage(void) {
3658     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3659     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
3660     fprintf(stderr,"       ./redis-server -v or --version\n");
3661     fprintf(stderr,"       ./redis-server -h or --help\n");
3662     fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
3663     fprintf(stderr,"Examples:\n");
3664     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
3665     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
3666     fprintf(stderr,"       ./redis-server --port 7777\n");
3667     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
3668     fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3669     fprintf(stderr,"Sentinel mode:\n");
3670     fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
3671     exit(1);
3672 }
3673 
3674 void redisAsciiArt(void) {
3675 #include "asciilogo.h"
3676     char *buf = zmalloc(1024*16);
3677     char *mode;
3678 
3679     if (server.cluster_enabled) mode = "cluster";
3680     else if (server.sentinel_mode) mode = "sentinel";
3681     else mode = "standalone";
3682 
3683     if (server.syslog_enabled) {
3684         serverLog(LL_NOTICE,
3685             "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
3686             REDIS_VERSION,
3687             redisGitSHA1(),
3688             strtol(redisGitDirty(),NULL,10) > 0,
3689             (sizeof(long) == 8) ? "64" : "32",
3690             mode, server.port,
3691             (long) getpid()
3692         );
3693     } else {
3694         snprintf(buf,1024*16,ascii_logo,
3695             REDIS_VERSION,
3696             redisGitSHA1(),
3697             strtol(redisGitDirty(),NULL,10) > 0,
3698             (sizeof(long) == 8) ? "64" : "32",
3699             mode, server.port,
3700             (long) getpid()
3701         );
3702         serverLogRaw(LL_NOTICE|LL_RAW,buf);
3703     }
3704     zfree(buf);
3705 }
3706 
3707 static void sigShutdownHandler(int sig) {
3708     char *msg;
3709 
3710     switch (sig) {
3711     case SIGINT:
3712         msg = "Received SIGINT scheduling shutdown...";
3713         break;
3714     case SIGTERM:
3715         msg = "Received SIGTERM scheduling shutdown...";
3716         break;
3717     default:
3718         msg = "Received shutdown signal, scheduling shutdown...";
3719     };
3720 
3721     /* SIGINT is often delivered via Ctrl+C in an interactive session.
3722      * If we receive the signal the second time, we interpret this as
3723      * the user really wanting to quit ASAP without waiting to persist
3724      * on disk. */
3725     if (server.shutdown_asap && sig == SIGINT) {
3726         serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3727         rdbRemoveTempFile(getpid());
3728         exit(1); /* Exit with an error since this was not a clean shutdown. */
3729     } else if (server.loading) {
3730         exit(0);
3731     }
3732 
3733     serverLogFromHandler(LL_WARNING, msg);
3734     server.shutdown_asap = 1;
3735 }
3736 
3737 void setupSignalHandlers(void) {
3738     struct sigaction act;
3739 
3740     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3741      * Otherwise, sa_handler is used. */
3742     sigemptyset(&act.sa_mask);
3743     act.sa_flags = 0;
3744     act.sa_handler = sigShutdownHandler;
3745     sigaction(SIGTERM, &act, NULL);
3746     sigaction(SIGINT, &act, NULL);
3747 
3748 #ifdef HAVE_BACKTRACE
3749     sigemptyset(&act.sa_mask);
3750     act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3751     act.sa_sigaction = sigsegvHandler;
3752     sigaction(SIGSEGV, &act, NULL);
3753     sigaction(SIGBUS, &act, NULL);
3754     sigaction(SIGFPE, &act, NULL);
3755     sigaction(SIGILL, &act, NULL);
3756 #endif
3757     return;
3758 }
3759 
3760 void memtest(size_t megabytes, int passes);
3761 
3762 /* Returns 1 if there is --sentinel among the arguments or if
3763  * argv[0] is exactly "redis-sentinel". */
3764 int checkForSentinelMode(int argc, char **argv) {
3765     int j;
3766 
3767     if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3768     for (j = 1; j < argc; j++)
3769         if (!strcmp(argv[j],"--sentinel")) return 1;
3770     return 0;
3771 }
3772 
3773 /* Function called at startup to load RDB or AOF file in memory. */
3774 void loadDataFromDisk(void) {
3775     long long start = ustime();
3776     if (server.aof_state == AOF_ON) {
3777         if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3778             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3779     } else {
3780         if (rdbLoad(server.rdb_filename) == C_OK) {
3781             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3782                 (float)(ustime()-start)/1000000);
3783         } else if (errno != ENOENT) {
3784             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3785             exit(1);
3786         }
3787     }
3788 }
3789 
3790 void redisOutOfMemoryHandler(size_t allocation_size) {
3791     serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3792         allocation_size);
3793     serverPanic("Redis aborting for OUT OF MEMORY");
3794 }
3795 
3796 void redisSetProcTitle(char *title) {
3797 #ifdef USE_SETPROCTITLE
3798     char *server_mode = "";
3799     if (server.cluster_enabled) server_mode = " [cluster]";
3800     else if (server.sentinel_mode) server_mode = " [sentinel]";
3801 
3802     setproctitle("%s %s:%d%s",
3803         title,
3804         server.bindaddr_count ? server.bindaddr[0] : "*",
3805         server.port,
3806         server_mode);
3807 #else
3808     UNUSED(title);
3809 #endif
3810 }
3811 
3812 /*
3813  * Check whether systemd or upstart have been used to start redis.
3814  */
3815 
3816 int redisSupervisedUpstart(void) {
3817     const char *upstart_job = getenv("UPSTART_JOB");
3818 
3819     if (!upstart_job) {
3820         serverLog(LL_WARNING,
3821                 "upstart supervision requested, but UPSTART_JOB not found");
3822         return 0;
3823     }
3824 
3825     serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness");
3826     raise(SIGSTOP);
3827     unsetenv("UPSTART_JOB");
3828     return 1;
3829 }
3830 
3831 int redisSupervisedSystemd(void) {
3832     const char *notify_socket = getenv("NOTIFY_SOCKET");
3833     int fd = 1;
3834     struct sockaddr_un su;
3835     struct iovec iov;
3836     struct msghdr hdr;
3837     int sendto_flags = 0;
3838 
3839     if (!notify_socket) {
3840         serverLog(LL_WARNING,
3841                 "systemd supervision requested, but NOTIFY_SOCKET not found");
3842         return 0;
3843     }
3844 
3845     if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3846         return 0;
3847     }
3848 
3849     serverLog(LL_NOTICE, "supervised by systemd, will signal readiness");
3850     if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3851         serverLog(LL_WARNING,
3852                 "Can't connect to systemd socket %s", notify_socket);
3853         return 0;
3854     }
3855 
3856     memset(&su, 0, sizeof(su));
3857     su.sun_family = AF_UNIX;
3858     strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3859     su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3860 
3861     if (notify_socket[0] == '@')
3862         su.sun_path[0] = '\0';
3863 
3864     memset(&iov, 0, sizeof(iov));
3865     iov.iov_base = "READY=1";
3866     iov.iov_len = strlen("READY=1");
3867 
3868     memset(&hdr, 0, sizeof(hdr));
3869     hdr.msg_name = &su;
3870     hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
3871         strlen(notify_socket);
3872     hdr.msg_iov = &iov;
3873     hdr.msg_iovlen = 1;
3874 
3875     unsetenv("NOTIFY_SOCKET");
3876 #ifdef HAVE_MSG_NOSIGNAL
3877     sendto_flags |= MSG_NOSIGNAL;
3878 #endif
3879     if (sendmsg(fd, &hdr, sendto_flags) < 0) {
3880         serverLog(LL_WARNING, "Can't send notification to systemd");
3881         close(fd);
3882         return 0;
3883     }
3884     close(fd);
3885     return 1;
3886 }
3887 
3888 int redisIsSupervised(int mode) {
3889     if (mode == SUPERVISED_AUTODETECT) {
3890         const char *upstart_job = getenv("UPSTART_JOB");
3891         const char *notify_socket = getenv("NOTIFY_SOCKET");
3892 
3893         if (upstart_job) {
3894             redisSupervisedUpstart();
3895         } else if (notify_socket) {
3896             redisSupervisedSystemd();
3897         }
3898     } else if (mode == SUPERVISED_UPSTART) {
3899         return redisSupervisedUpstart();
3900     } else if (mode == SUPERVISED_SYSTEMD) {
3901         return redisSupervisedSystemd();
3902     }
3903 
3904     return 0;
3905 }
3906 
3907 
3908 int main(int argc, char **argv) {
3909     struct timeval tv;
3910     int j;
3911 
3912 #ifdef REDIS_TEST
3913     if (argc == 3 && !strcasecmp(argv[1], "test")) {
3914         if (!strcasecmp(argv[2], "ziplist")) {
3915             return ziplistTest(argc, argv);
3916         } else if (!strcasecmp(argv[2], "quicklist")) {
3917             quicklistTest(argc, argv);
3918         } else if (!strcasecmp(argv[2], "intset")) {
3919             return intsetTest(argc, argv);
3920         } else if (!strcasecmp(argv[2], "zipmap")) {
3921             return zipmapTest(argc, argv);
3922         } else if (!strcasecmp(argv[2], "sha1test")) {
3923             return sha1Test(argc, argv);
3924         } else if (!strcasecmp(argv[2], "util")) {
3925             return utilTest(argc, argv);
3926         } else if (!strcasecmp(argv[2], "sds")) {
3927             return sdsTest(argc, argv);
3928         } else if (!strcasecmp(argv[2], "endianconv")) {
3929             return endianconvTest(argc, argv);
3930         } else if (!strcasecmp(argv[2], "crc64")) {
3931             return crc64Test(argc, argv);
3932         }
3933 
3934         return -1; /* test not found */
3935     }
3936 #endif
3937 
3938     /* We need to initialize our libraries, and the server configuration. */
3939 #ifdef INIT_SETPROCTITLE_REPLACEMENT
3940     spt_init(argc, argv);
3941 #endif
3942     setlocale(LC_COLLATE,"");
3943     zmalloc_enable_thread_safeness();
3944     zmalloc_set_oom_handler(redisOutOfMemoryHandler);
3945     srand(time(NULL)^getpid());
3946     gettimeofday(&tv,NULL);
3947     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
3948     server.sentinel_mode = checkForSentinelMode(argc,argv);
3949     initServerConfig();
3950 
3951     /* Store the executable path and arguments in a safe place in order
3952      * to be able to restart the server later. */
3953     server.executable = getAbsolutePath(argv[0]);
3954     server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
3955     server.exec_argv[argc] = NULL;
3956     for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
3957 
3958     /* We need to init sentinel right now as parsing the configuration file
3959      * in sentinel mode will have the effect of populating the sentinel
3960      * data structures with master nodes to monitor. */
3961     if (server.sentinel_mode) {
3962         initSentinelConfig();
3963         initSentinel();
3964     }
3965 
3966     /* Check if we need to start in redis-check-rdb mode. We just execute
3967      * the program main. However the program is part of the Redis executable
3968      * so that we can easily execute an RDB check on loading errors. */
3969     if (strstr(argv[0],"redis-check-rdb") != NULL)
3970         exit(redis_check_rdb_main(argv,argc));
3971 
3972     if (argc >= 2) {
3973         j = 1; /* First option to parse in argv[] */
3974         sds options = sdsempty();
3975         char *configfile = NULL;
3976 
3977         /* Handle special options --help and --version */
3978         if (strcmp(argv[1], "-v") == 0 ||
3979             strcmp(argv[1], "--version") == 0) version();
3980         if (strcmp(argv[1], "--help") == 0 ||
3981             strcmp(argv[1], "-h") == 0) usage();
3982         if (strcmp(argv[1], "--test-memory") == 0) {
3983             if (argc == 3) {
3984                 memtest(atoi(argv[2]),50);
3985                 exit(0);
3986             } else {
3987                 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
3988                 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
3989                 exit(1);
3990             }
3991         }
3992 
3993         /* First argument is the config file name? */
3994         if (argv[j][0] != '-' || argv[j][1] != '-') {
3995             configfile = argv[j];
3996             server.configfile = getAbsolutePath(configfile);
3997             /* Replace the config file in server.exec_argv with
3998              * its absoulte path. */
3999             zfree(server.exec_argv[j]);
4000             server.exec_argv[j] = zstrdup(server.configfile);
4001             j++;
4002         }
4003 
4004         /* All the other options are parsed and conceptually appended to the
4005          * configuration file. For instance --port 6380 will generate the
4006          * string "port 6380\n" to be parsed after the actual file name
4007          * is parsed, if any. */
4008         while(j != argc) {
4009             if (argv[j][0] == '-' && argv[j][1] == '-') {
4010                 /* Option name */
4011                 if (!strcmp(argv[j], "--check-rdb")) {
4012                     /* Argument has no options, need to skip for parsing. */
4013                     j++;
4014                     continue;
4015                 }
4016                 if (sdslen(options)) options = sdscat(options,"\n");
4017                 options = sdscat(options,argv[j]+2);
4018                 options = sdscat(options," ");
4019             } else {
4020                 /* Option argument */
4021                 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4022                 options = sdscat(options," ");
4023             }
4024             j++;
4025         }
4026         if (server.sentinel_mode && configfile && *configfile == '-') {
4027             serverLog(LL_WARNING,
4028                 "Sentinel config from STDIN not allowed.");
4029             serverLog(LL_WARNING,
4030                 "Sentinel needs config file on disk to save state.  Exiting...");
4031             exit(1);
4032         }
4033         resetServerSaveParams();
4034         loadServerConfig(configfile,options);
4035         sdsfree(options);
4036     } else {
4037         serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4038     }
4039 
4040     server.supervised = redisIsSupervised(server.supervised_mode);
4041     int background = server.daemonize && !server.supervised;
4042     if (background) daemonize();
4043 
4044     initServer();
4045     if (background || server.pidfile) createPidFile();
4046     redisSetProcTitle(argv[0]);
4047     redisAsciiArt();
4048     checkTcpBacklogSettings();
4049 
4050     if (!server.sentinel_mode) {
4051         /* Things not needed when running in Sentinel mode. */
4052         serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
4053     #ifdef __linux__
4054         linuxMemoryWarnings();
4055     #endif
4056         loadDataFromDisk();
4057         if (server.cluster_enabled) {
4058             if (verifyClusterConfigWithData() == C_ERR) {
4059                 serverLog(LL_WARNING,
4060                     "You can't have keys in a DB different than DB 0 when in "
4061                     "Cluster mode. Exiting.");
4062                 exit(1);
4063             }
4064         }
4065         if (server.ipfd_count > 0)
4066             serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4067         if (server.sofd > 0)
4068             serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4069     } else {
4070         sentinelIsRunning();
4071     }
4072 
4073     /* Warning the user about suspicious maxmemory setting. */
4074     if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4075         serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4076     }
4077 
4078     aeSetBeforeSleepProc(server.el,beforeSleep);
4079     aeMain(server.el);
4080     aeDeleteEventLoop(server.el);
4081     return 0;
4082 }
4083 
4084 /* The End */
4085