xref: /redis-3.2.3/src/server.c (revision 0a45fbc3)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35 
36 #include <time.h>
37 #include <signal.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <assert.h>
41 #include <ctype.h>
42 #include <stdarg.h>
43 #include <arpa/inet.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #include <sys/uio.h>
49 #include <sys/un.h>
50 #include <limits.h>
51 #include <float.h>
52 #include <math.h>
53 #include <sys/resource.h>
54 #include <sys/utsname.h>
55 #include <locale.h>
56 #include <sys/socket.h>
57 
58 /* Our shared "common" objects */
59 
60 struct sharedObjectsStruct shared;
61 
62 /* Global vars that are actually used as constants. The following double
63  * values are used for double on-disk serialization, and are initialized
64  * at runtime to avoid strange compiler optimizations. */
65 
66 double R_Zero, R_PosInf, R_NegInf, R_Nan;
67 
68 /*================================= Globals ================================= */
69 
70 /* Global vars */
71 struct redisServer server; /* server global state */
72 
73 /* Our command table.
74  *
75  * Every entry is composed of the following fields:
76  *
77  * name: a string representing the command name.
78  * function: pointer to the C function implementing the command.
79  * arity: number of arguments, it is possible to use -N to say >= N
80  * sflags: command flags as string. See below for a table of flags.
81  * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
82  * get_keys_proc: an optional function to get key arguments from a command.
83  *                This is only used when the following three fields are not
84  *                enough to specify what arguments are keys.
85  * first_key_index: first argument that is a key
86  * last_key_index: last argument that is a key
87  * key_step: step to get all the keys from first to last argument. For instance
88  *           in MSET the step is two since arguments are key,val,key,val,...
89  * microseconds: microseconds of total execution time for this command.
90  * calls: total number of calls of this command.
91  *
92  * The flags, microseconds and calls fields are computed by Redis and should
93  * always be set to zero.
94  *
95  * Command flags are expressed using strings where every character represents
96  * a flag. Later the populateCommandTable() function will take care of
97  * populating the real 'flags' field using this characters.
98  *
99  * This is the meaning of the flags:
100  *
101  * w: write command (may modify the key space).
102  * r: read command  (will never modify the key space).
103  * m: may increase memory usage once called. Don't allow if out of memory.
104  * a: admin command, like SAVE or SHUTDOWN.
105  * p: Pub/Sub related command.
106  * f: force replication of this command, regardless of server.dirty.
107  * s: command not allowed in scripts.
108  * R: random command. Command is not deterministic, that is, the same command
109  *    with the same arguments, with the same key space, may have different
110  *    results. For instance SPOP and RANDOMKEY are two random commands.
111  * S: Sort command output array if called from script, so that the output
112  *    is deterministic.
113  * l: Allow command while loading the database.
114  * t: Allow command while a slave has stale data but is not allowed to
115  *    server this data. Normally no command is accepted in this condition
116  *    but just a few.
117  * M: Do not automatically propagate the command on MONITOR.
118  * k: Perform an implicit ASKING for this command, so the command will be
119  *    accepted in cluster mode if the slot is marked as 'importing'.
120  * F: Fast command: O(1) or O(log(N)) command that should never delay
121  *    its execution as long as the kernel scheduler is giving us time.
122  *    Note that commands that may trigger a DEL as a side effect (like SET)
123  *    are not fast commands.
124  */
125 struct redisCommand redisCommandTable[] = {
126     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
127     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
128     {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
129     {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
130     {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
131     {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
132     {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
133     {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
134     {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
135     {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
136     {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
137     {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
138     {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
139     {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
140     {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
141     {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
142     {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
143     {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
144     {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
145     {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
146     {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
147     {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
148     {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
149     {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
150     {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
151     {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
152     {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
153     {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
154     {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
155     {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
156     {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
157     {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
158     {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
159     {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
160     {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
161     {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
162     {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
163     {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
164     {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
165     {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
166     {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0},
167     {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
168     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
169     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
170     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
171     {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
172     {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
173     {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
174     {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
175     {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
176     {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
177     {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
178     {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
179     {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
180     {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
181     {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
182     {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
183     {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
184     {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
185     {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
186     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
187     {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
188     {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
189     {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
190     {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
191     {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
192     {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
193     {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
194     {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
195     {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
196     {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
197     {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
198     {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
199     {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
200     {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
201     {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
202     {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
203     {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
204     {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
205     {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
206     {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
207     {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
208     {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
209     {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
210     {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
211     {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
212     {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
213     {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
214     {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
215     {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
216     {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
217     {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
218     {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
219     {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
220     {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
221     {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
222     {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
223     {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
224     {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
225     {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
226     {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
227     {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
228     {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
229     {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
230     {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0},
231     {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0},
232     {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0},
233     {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0},
234     {"bgsave",bgsaveCommand,-1,"a",0,NULL,0,0,0,0,0},
235     {"bgrewriteaof",bgrewriteaofCommand,1,"a",0,NULL,0,0,0,0,0},
236     {"shutdown",shutdownCommand,-1,"alt",0,NULL,0,0,0,0,0},
237     {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0},
238     {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
239     {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
240     {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
241     {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
242     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
243     {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
244     {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0},
245     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
246     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
247     {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
248     {"info",infoCommand,-1,"lt",0,NULL,0,0,0,0,0},
249     {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0},
250     {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
251     {"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0},
252     {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
253     {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
254     {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
255     {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
256     {"debug",debugCommand,-1,"as",0,NULL,0,0,0,0,0},
257     {"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0},
258     {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
259     {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
260     {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
261     {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
262     {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
263     {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
264     {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
265     {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
266     {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0},
267     {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
268     {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
269     {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},
270     {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
271     {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
272     {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
273     {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
274     {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
275     {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
276     {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
277     {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
278     {"slowlog",slowlogCommand,-2,"a",0,NULL,0,0,0,0,0},
279     {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0},
280     {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0},
281     {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
282     {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
283     {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
284     {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0},
285     {"command",commandCommand,0,"lt",0,NULL,0,0,0,0,0},
286     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
287     {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0},
288     {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0},
289     {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
290     {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
291     {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
292     {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0},
293     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
294     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
295     {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
296     {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
297     {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
298 };
299 
300 struct evictionPoolEntry *evictionPoolAlloc(void);
301 
302 /*============================ Utility functions ============================ */
303 
304 /* Low level logging. To use only for very big messages, otherwise
305  * serverLog() is to prefer. */
serverLogRaw(int level,const char * msg)306 void serverLogRaw(int level, const char *msg) {
307     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
308     const char *c = ".-*#";
309     FILE *fp;
310     char buf[64];
311     int rawmode = (level & LL_RAW);
312     int log_to_stdout = server.logfile[0] == '\0';
313 
314     level &= 0xff; /* clear flags */
315     if (level < server.verbosity) return;
316 
317     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
318     if (!fp) return;
319 
320     if (rawmode) {
321         fprintf(fp,"%s",msg);
322     } else {
323         int off;
324         struct timeval tv;
325         int role_char;
326         pid_t pid = getpid();
327 
328         gettimeofday(&tv,NULL);
329         off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec));
330         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
331         if (server.sentinel_mode) {
332             role_char = 'X'; /* Sentinel. */
333         } else if (pid != server.pid) {
334             role_char = 'C'; /* RDB / AOF writing child. */
335         } else {
336             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
337         }
338         fprintf(fp,"%d:%c %s %c %s\n",
339             (int)getpid(),role_char, buf,c[level],msg);
340     }
341     fflush(fp);
342 
343     if (!log_to_stdout) fclose(fp);
344     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
345 }
346 
347 /* Like serverLogRaw() but with printf-alike support. This is the function that
348  * is used across the code. The raw version is only used in order to dump
349  * the INFO output on crash. */
serverLog(int level,const char * fmt,...)350 void serverLog(int level, const char *fmt, ...) {
351     va_list ap;
352     char msg[LOG_MAX_LEN];
353 
354     if ((level&0xff) < server.verbosity) return;
355 
356     va_start(ap, fmt);
357     vsnprintf(msg, sizeof(msg), fmt, ap);
358     va_end(ap);
359 
360     serverLogRaw(level,msg);
361 }
362 
363 /* Log a fixed message without printf-alike capabilities, in a way that is
364  * safe to call from a signal handler.
365  *
366  * We actually use this only for signals that are not fatal from the point
367  * of view of Redis. Signals that are going to kill the server anyway and
368  * where we need printf-alike features are served by serverLog(). */
serverLogFromHandler(int level,const char * msg)369 void serverLogFromHandler(int level, const char *msg) {
370     int fd;
371     int log_to_stdout = server.logfile[0] == '\0';
372     char buf[64];
373 
374     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
375         return;
376     fd = log_to_stdout ? STDOUT_FILENO :
377                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
378     if (fd == -1) return;
379     ll2string(buf,sizeof(buf),getpid());
380     if (write(fd,buf,strlen(buf)) == -1) goto err;
381     if (write(fd,":signal-handler (",17) == -1) goto err;
382     ll2string(buf,sizeof(buf),time(NULL));
383     if (write(fd,buf,strlen(buf)) == -1) goto err;
384     if (write(fd,") ",2) == -1) goto err;
385     if (write(fd,msg,strlen(msg)) == -1) goto err;
386     if (write(fd,"\n",1) == -1) goto err;
387 err:
388     if (!log_to_stdout) close(fd);
389 }
390 
391 /* Return the UNIX time in microseconds */
ustime(void)392 long long ustime(void) {
393     struct timeval tv;
394     long long ust;
395 
396     gettimeofday(&tv, NULL);
397     ust = ((long long)tv.tv_sec)*1000000;
398     ust += tv.tv_usec;
399     return ust;
400 }
401 
402 /* Return the UNIX time in milliseconds */
mstime(void)403 mstime_t mstime(void) {
404     return ustime()/1000;
405 }
406 
407 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
408  * exit(), because the latter may interact with the same file objects used by
409  * the parent process. However if we are testing the coverage normal exit() is
410  * used in order to obtain the right coverage information. */
exitFromChild(int retcode)411 void exitFromChild(int retcode) {
412 #ifdef COVERAGE_TEST
413     exit(retcode);
414 #else
415     _exit(retcode);
416 #endif
417 }
418 
419 /*====================== Hash table type implementation  ==================== */
420 
421 /* This is a hash table type that uses the SDS dynamic strings library as
422  * keys and redis objects as values (objects can hold SDS strings,
423  * lists, sets). */
424 
dictVanillaFree(void * privdata,void * val)425 void dictVanillaFree(void *privdata, void *val)
426 {
427     DICT_NOTUSED(privdata);
428     zfree(val);
429 }
430 
dictListDestructor(void * privdata,void * val)431 void dictListDestructor(void *privdata, void *val)
432 {
433     DICT_NOTUSED(privdata);
434     listRelease((list*)val);
435 }
436 
dictSdsKeyCompare(void * privdata,const void * key1,const void * key2)437 int dictSdsKeyCompare(void *privdata, const void *key1,
438         const void *key2)
439 {
440     int l1,l2;
441     DICT_NOTUSED(privdata);
442 
443     l1 = sdslen((sds)key1);
444     l2 = sdslen((sds)key2);
445     if (l1 != l2) return 0;
446     return memcmp(key1, key2, l1) == 0;
447 }
448 
449 /* A case insensitive version used for the command lookup table and other
450  * places where case insensitive non binary-safe comparison is needed. */
dictSdsKeyCaseCompare(void * privdata,const void * key1,const void * key2)451 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
452         const void *key2)
453 {
454     DICT_NOTUSED(privdata);
455 
456     return strcasecmp(key1, key2) == 0;
457 }
458 
dictObjectDestructor(void * privdata,void * val)459 void dictObjectDestructor(void *privdata, void *val)
460 {
461     DICT_NOTUSED(privdata);
462 
463     if (val == NULL) return; /* Values of swapped out keys as set to NULL */
464     decrRefCount(val);
465 }
466 
dictSdsDestructor(void * privdata,void * val)467 void dictSdsDestructor(void *privdata, void *val)
468 {
469     DICT_NOTUSED(privdata);
470 
471     sdsfree(val);
472 }
473 
dictObjKeyCompare(void * privdata,const void * key1,const void * key2)474 int dictObjKeyCompare(void *privdata, const void *key1,
475         const void *key2)
476 {
477     const robj *o1 = key1, *o2 = key2;
478     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
479 }
480 
dictObjHash(const void * key)481 unsigned int dictObjHash(const void *key) {
482     const robj *o = key;
483     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
484 }
485 
dictSdsHash(const void * key)486 unsigned int dictSdsHash(const void *key) {
487     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
488 }
489 
dictSdsCaseHash(const void * key)490 unsigned int dictSdsCaseHash(const void *key) {
491     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
492 }
493 
dictEncObjKeyCompare(void * privdata,const void * key1,const void * key2)494 int dictEncObjKeyCompare(void *privdata, const void *key1,
495         const void *key2)
496 {
497     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
498     int cmp;
499 
500     if (o1->encoding == OBJ_ENCODING_INT &&
501         o2->encoding == OBJ_ENCODING_INT)
502             return o1->ptr == o2->ptr;
503 
504     o1 = getDecodedObject(o1);
505     o2 = getDecodedObject(o2);
506     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
507     decrRefCount(o1);
508     decrRefCount(o2);
509     return cmp;
510 }
511 
dictEncObjHash(const void * key)512 unsigned int dictEncObjHash(const void *key) {
513     robj *o = (robj*) key;
514 
515     if (sdsEncodedObject(o)) {
516         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
517     } else {
518         if (o->encoding == OBJ_ENCODING_INT) {
519             char buf[32];
520             int len;
521 
522             len = ll2string(buf,32,(long)o->ptr);
523             return dictGenHashFunction((unsigned char*)buf, len);
524         } else {
525             unsigned int hash;
526 
527             o = getDecodedObject(o);
528             hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
529             decrRefCount(o);
530             return hash;
531         }
532     }
533 }
534 
535 /* Sets type hash table */
536 dictType setDictType = {
537     dictEncObjHash,            /* hash function */
538     NULL,                      /* key dup */
539     NULL,                      /* val dup */
540     dictEncObjKeyCompare,      /* key compare */
541     dictObjectDestructor, /* key destructor */
542     NULL                       /* val destructor */
543 };
544 
545 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
546 dictType zsetDictType = {
547     dictEncObjHash,            /* hash function */
548     NULL,                      /* key dup */
549     NULL,                      /* val dup */
550     dictEncObjKeyCompare,      /* key compare */
551     dictObjectDestructor, /* key destructor */
552     NULL                       /* val destructor */
553 };
554 
555 /* Db->dict, keys are sds strings, vals are Redis objects. */
556 dictType dbDictType = {
557     dictSdsHash,                /* hash function */
558     NULL,                       /* key dup */
559     NULL,                       /* val dup */
560     dictSdsKeyCompare,          /* key compare */
561     dictSdsDestructor,          /* key destructor */
562     dictObjectDestructor   /* val destructor */
563 };
564 
565 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
566 dictType shaScriptObjectDictType = {
567     dictSdsCaseHash,            /* hash function */
568     NULL,                       /* key dup */
569     NULL,                       /* val dup */
570     dictSdsKeyCaseCompare,      /* key compare */
571     dictSdsDestructor,          /* key destructor */
572     dictObjectDestructor   /* val destructor */
573 };
574 
575 /* Db->expires */
576 dictType keyptrDictType = {
577     dictSdsHash,               /* hash function */
578     NULL,                      /* key dup */
579     NULL,                      /* val dup */
580     dictSdsKeyCompare,         /* key compare */
581     NULL,                      /* key destructor */
582     NULL                       /* val destructor */
583 };
584 
585 /* Command table. sds string -> command struct pointer. */
586 dictType commandTableDictType = {
587     dictSdsCaseHash,           /* hash function */
588     NULL,                      /* key dup */
589     NULL,                      /* val dup */
590     dictSdsKeyCaseCompare,     /* key compare */
591     dictSdsDestructor,         /* key destructor */
592     NULL                       /* val destructor */
593 };
594 
595 /* Hash type hash table (note that small hashes are represented with ziplists) */
596 dictType hashDictType = {
597     dictEncObjHash,             /* hash function */
598     NULL,                       /* key dup */
599     NULL,                       /* val dup */
600     dictEncObjKeyCompare,       /* key compare */
601     dictObjectDestructor,  /* key destructor */
602     dictObjectDestructor   /* val destructor */
603 };
604 
605 /* Keylist hash table type has unencoded redis objects as keys and
606  * lists as values. It's used for blocking operations (BLPOP) and to
607  * map swapped keys to a list of clients waiting for this keys to be loaded. */
608 dictType keylistDictType = {
609     dictObjHash,                /* hash function */
610     NULL,                       /* key dup */
611     NULL,                       /* val dup */
612     dictObjKeyCompare,          /* key compare */
613     dictObjectDestructor,  /* key destructor */
614     dictListDestructor          /* val destructor */
615 };
616 
617 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
618  * clusterNode structures. */
619 dictType clusterNodesDictType = {
620     dictSdsHash,                /* hash function */
621     NULL,                       /* key dup */
622     NULL,                       /* val dup */
623     dictSdsKeyCompare,          /* key compare */
624     dictSdsDestructor,          /* key destructor */
625     NULL                        /* val destructor */
626 };
627 
628 /* Cluster re-addition blacklist. This maps node IDs to the time
629  * we can re-add this node. The goal is to avoid readding a removed
630  * node for some time. */
631 dictType clusterNodesBlackListDictType = {
632     dictSdsCaseHash,            /* hash function */
633     NULL,                       /* key dup */
634     NULL,                       /* val dup */
635     dictSdsKeyCaseCompare,      /* key compare */
636     dictSdsDestructor,          /* key destructor */
637     NULL                        /* val destructor */
638 };
639 
640 /* Migrate cache dict type. */
641 dictType migrateCacheDictType = {
642     dictSdsHash,                /* hash function */
643     NULL,                       /* key dup */
644     NULL,                       /* val dup */
645     dictSdsKeyCompare,          /* key compare */
646     dictSdsDestructor,          /* key destructor */
647     NULL                        /* val destructor */
648 };
649 
650 /* Replication cached script dict (server.repl_scriptcache_dict).
651  * Keys are sds SHA1 strings, while values are not used at all in the current
652  * implementation. */
653 dictType replScriptCacheDictType = {
654     dictSdsCaseHash,            /* hash function */
655     NULL,                       /* key dup */
656     NULL,                       /* val dup */
657     dictSdsKeyCaseCompare,      /* key compare */
658     dictSdsDestructor,          /* key destructor */
659     NULL                        /* val destructor */
660 };
661 
htNeedsResize(dict * dict)662 int htNeedsResize(dict *dict) {
663     long long size, used;
664 
665     size = dictSlots(dict);
666     used = dictSize(dict);
667     return (size > DICT_HT_INITIAL_SIZE &&
668             (used*100/size < HASHTABLE_MIN_FILL));
669 }
670 
671 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
672  * we resize the hash table to save memory */
tryResizeHashTables(int dbid)673 void tryResizeHashTables(int dbid) {
674     if (htNeedsResize(server.db[dbid].dict))
675         dictResize(server.db[dbid].dict);
676     if (htNeedsResize(server.db[dbid].expires))
677         dictResize(server.db[dbid].expires);
678 }
679 
680 /* Our hash table implementation performs rehashing incrementally while
681  * we write/read from the hash table. Still if the server is idle, the hash
682  * table will use two tables for a long time. So we try to use 1 millisecond
683  * of CPU time at every call of this function to perform some rehahsing.
684  *
685  * The function returns 1 if some rehashing was performed, otherwise 0
686  * is returned. */
incrementallyRehash(int dbid)687 int incrementallyRehash(int dbid) {
688     /* Keys dictionary */
689     if (dictIsRehashing(server.db[dbid].dict)) {
690         dictRehashMilliseconds(server.db[dbid].dict,1);
691         return 1; /* already used our millisecond for this loop... */
692     }
693     /* Expires */
694     if (dictIsRehashing(server.db[dbid].expires)) {
695         dictRehashMilliseconds(server.db[dbid].expires,1);
696         return 1; /* already used our millisecond for this loop... */
697     }
698     return 0;
699 }
700 
701 /* This function is called once a background process of some kind terminates,
702  * as we want to avoid resizing the hash tables when there is a child in order
703  * to play well with copy-on-write (otherwise when a resize happens lots of
704  * memory pages are copied). The goal of this function is to update the ability
705  * for dict.c to resize the hash tables accordingly to the fact we have o not
706  * running childs. */
updateDictResizePolicy(void)707 void updateDictResizePolicy(void) {
708     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
709         dictEnableResize();
710     else
711         dictDisableResize();
712 }
713 
714 /* ======================= Cron: called every 100 ms ======================== */
715 
716 /* Helper function for the activeExpireCycle() function.
717  * This function will try to expire the key that is stored in the hash table
718  * entry 'de' of the 'expires' hash table of a Redis database.
719  *
720  * If the key is found to be expired, it is removed from the database and
721  * 1 is returned. Otherwise no operation is performed and 0 is returned.
722  *
723  * When a key is expired, server.stat_expiredkeys is incremented.
724  *
725  * The parameter 'now' is the current time in milliseconds as is passed
726  * to the function to avoid too many gettimeofday() syscalls. */
activeExpireCycleTryExpire(redisDb * db,dictEntry * de,long long now)727 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
728     long long t = dictGetSignedIntegerVal(de);
729     if (now > t) {
730         sds key = dictGetKey(de);
731         robj *keyobj = createStringObject(key,sdslen(key));
732 
733         propagateExpire(db,keyobj);
734         dbDelete(db,keyobj);
735         notifyKeyspaceEvent(NOTIFY_EXPIRED,
736             "expired",keyobj,db->id);
737         decrRefCount(keyobj);
738         server.stat_expiredkeys++;
739         return 1;
740     } else {
741         return 0;
742     }
743 }
744 
745 /* Try to expire a few timed out keys. The algorithm used is adaptive and
746  * will use few CPU cycles if there are few expiring keys, otherwise
747  * it will get more aggressive to avoid that too much memory is used by
748  * keys that can be removed from the keyspace.
749  *
750  * No more than CRON_DBS_PER_CALL databases are tested at every
751  * iteration.
752  *
753  * This kind of call is used when Redis detects that timelimit_exit is
754  * true, so there is more work to do, and we do it more incrementally from
755  * the beforeSleep() function of the event loop.
756  *
757  * Expire cycle type:
758  *
759  * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
760  * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
761  * microseconds, and is not repeated again before the same amount of time.
762  *
763  * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
764  * executed, where the time limit is a percentage of the REDIS_HZ period
765  * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */
766 
activeExpireCycle(int type)767 void activeExpireCycle(int type) {
768     /* This function has some global state in order to continue the work
769      * incrementally across calls. */
770     static unsigned int current_db = 0; /* Last DB tested. */
771     static int timelimit_exit = 0;      /* Time limit hit in previous call? */
772     static long long last_fast_cycle = 0; /* When last fast cycle ran. */
773 
774     int j, iteration = 0;
775     int dbs_per_call = CRON_DBS_PER_CALL;
776     long long start = ustime(), timelimit;
777 
778     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
779         /* Don't start a fast cycle if the previous cycle did not exited
780          * for time limt. Also don't repeat a fast cycle for the same period
781          * as the fast cycle total duration itself. */
782         if (!timelimit_exit) return;
783         if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
784         last_fast_cycle = start;
785     }
786 
787     /* We usually should test CRON_DBS_PER_CALL per iteration, with
788      * two exceptions:
789      *
790      * 1) Don't test more DBs than we have.
791      * 2) If last time we hit the time limit, we want to scan all DBs
792      * in this iteration, as there is work to do in some DB and we don't want
793      * expired keys to use memory for too much time. */
794     if (dbs_per_call > server.dbnum || timelimit_exit)
795         dbs_per_call = server.dbnum;
796 
797     /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
798      * per iteration. Since this function gets called with a frequency of
799      * server.hz times per second, the following is the max amount of
800      * microseconds we can spend in this function. */
801     timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
802     timelimit_exit = 0;
803     if (timelimit <= 0) timelimit = 1;
804 
805     if (type == ACTIVE_EXPIRE_CYCLE_FAST)
806         timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
807 
808     for (j = 0; j < dbs_per_call; j++) {
809         int expired;
810         redisDb *db = server.db+(current_db % server.dbnum);
811 
812         /* Increment the DB now so we are sure if we run out of time
813          * in the current DB we'll restart from the next. This allows to
814          * distribute the time evenly across DBs. */
815         current_db++;
816 
817         /* Continue to expire if at the end of the cycle more than 25%
818          * of the keys were expired. */
819         do {
820             unsigned long num, slots;
821             long long now, ttl_sum;
822             int ttl_samples;
823 
824             /* If there is nothing to expire try next DB ASAP. */
825             if ((num = dictSize(db->expires)) == 0) {
826                 db->avg_ttl = 0;
827                 break;
828             }
829             slots = dictSlots(db->expires);
830             now = mstime();
831 
832             /* When there are less than 1% filled slots getting random
833              * keys is expensive, so stop here waiting for better times...
834              * The dictionary will be resized asap. */
835             if (num && slots > DICT_HT_INITIAL_SIZE &&
836                 (num*100/slots < 1)) break;
837 
838             /* The main collection cycle. Sample random keys among keys
839              * with an expire set, checking for expired ones. */
840             expired = 0;
841             ttl_sum = 0;
842             ttl_samples = 0;
843 
844             if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
845                 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
846 
847             while (num--) {
848                 dictEntry *de;
849                 long long ttl;
850 
851                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
852                 ttl = dictGetSignedIntegerVal(de)-now;
853                 if (activeExpireCycleTryExpire(db,de,now)) expired++;
854                 if (ttl > 0) {
855                     /* We want the average TTL of keys yet not expired. */
856                     ttl_sum += ttl;
857                     ttl_samples++;
858                 }
859             }
860 
861             /* Update the average TTL stats for this database. */
862             if (ttl_samples) {
863                 long long avg_ttl = ttl_sum/ttl_samples;
864 
865                 /* Do a simple running average with a few samples.
866                  * We just use the current estimate with a weight of 2%
867                  * and the previous estimate with a weight of 98%. */
868                 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
869                 db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
870             }
871 
872             /* We can't block forever here even if there are many keys to
873              * expire. So after a given amount of milliseconds return to the
874              * caller waiting for the other active expire cycle. */
875             iteration++;
876             if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
877                 long long elapsed = ustime()-start;
878 
879                 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
880                 if (elapsed > timelimit) timelimit_exit = 1;
881             }
882             if (timelimit_exit) return;
883             /* We don't repeat the cycle if there are less than 25% of keys
884              * found expired in the current DB. */
885         } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
886     }
887 }
888 
getLRUClock(void)889 unsigned int getLRUClock(void) {
890     return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
891 }
892 
893 /* Add a sample to the operations per second array of samples. */
trackInstantaneousMetric(int metric,long long current_reading)894 void trackInstantaneousMetric(int metric, long long current_reading) {
895     long long t = mstime() - server.inst_metric[metric].last_sample_time;
896     long long ops = current_reading -
897                     server.inst_metric[metric].last_sample_count;
898     long long ops_sec;
899 
900     ops_sec = t > 0 ? (ops*1000/t) : 0;
901 
902     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
903         ops_sec;
904     server.inst_metric[metric].idx++;
905     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
906     server.inst_metric[metric].last_sample_time = mstime();
907     server.inst_metric[metric].last_sample_count = current_reading;
908 }
909 
910 /* Return the mean of all the samples. */
getInstantaneousMetric(int metric)911 long long getInstantaneousMetric(int metric) {
912     int j;
913     long long sum = 0;
914 
915     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
916         sum += server.inst_metric[metric].samples[j];
917     return sum / STATS_METRIC_SAMPLES;
918 }
919 
920 /* Check for timeouts. Returns non-zero if the client was terminated.
921  * The function gets the current time in milliseconds as argument since
922  * it gets called multiple times in a loop, so calling gettimeofday() for
923  * each iteration would be costly without any actual gain. */
clientsCronHandleTimeout(client * c,mstime_t now_ms)924 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
925     time_t now = now_ms/1000;
926 
927     if (server.maxidletime &&
928         !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
929         !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
930         !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
931         !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
932         (now - c->lastinteraction > server.maxidletime))
933     {
934         serverLog(LL_VERBOSE,"Closing idle client");
935         freeClient(c);
936         return 1;
937     } else if (c->flags & CLIENT_BLOCKED) {
938         /* Blocked OPS timeout is handled with milliseconds resolution.
939          * However note that the actual resolution is limited by
940          * server.hz. */
941 
942         if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
943             /* Handle blocking operation specific timeout. */
944             replyToBlockedClientTimedOut(c);
945             unblockClient(c);
946         } else if (server.cluster_enabled) {
947             /* Cluster: handle unblock & redirect of clients blocked
948              * into keys no longer served by this server. */
949             if (clusterRedirectBlockedClientIfNeeded(c))
950                 unblockClient(c);
951         }
952     }
953     return 0;
954 }
955 
956 /* The client query buffer is an sds.c string that can end with a lot of
957  * free space not used, this function reclaims space if needed.
958  *
959  * The function always returns 0 as it never terminates the client. */
clientsCronResizeQueryBuffer(client * c)960 int clientsCronResizeQueryBuffer(client *c) {
961     size_t querybuf_size = sdsAllocSize(c->querybuf);
962     time_t idletime = server.unixtime - c->lastinteraction;
963 
964     /* There are two conditions to resize the query buffer:
965      * 1) Query buffer is > BIG_ARG and too big for latest peak.
966      * 2) Client is inactive and the buffer is bigger than 1k. */
967     if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
968          (querybuf_size/(c->querybuf_peak+1)) > 2) ||
969          (querybuf_size > 1024 && idletime > 2))
970     {
971         /* Only resize the query buffer if it is actually wasting space. */
972         if (sdsavail(c->querybuf) > 1024) {
973             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
974         }
975     }
976     /* Reset the peak again to capture the peak memory usage in the next
977      * cycle. */
978     c->querybuf_peak = 0;
979     return 0;
980 }
981 
982 #define CLIENTS_CRON_MIN_ITERATIONS 5
clientsCron(void)983 void clientsCron(void) {
984     /* Make sure to process at least numclients/server.hz of clients
985      * per call. Since this function is called server.hz times per second
986      * we are sure that in the worst case we process all the clients in 1
987      * second. */
988     int numclients = listLength(server.clients);
989     int iterations = numclients/server.hz;
990     mstime_t now = mstime();
991 
992     /* Process at least a few clients while we are at it, even if we need
993      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
994      * of processing each client once per second. */
995     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
996         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
997                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
998 
999     while(listLength(server.clients) && iterations--) {
1000         client *c;
1001         listNode *head;
1002 
1003         /* Rotate the list, take the current head, process.
1004          * This way if the client must be removed from the list it's the
1005          * first element and we don't incur into O(N) computation. */
1006         listRotate(server.clients);
1007         head = listFirst(server.clients);
1008         c = listNodeValue(head);
1009         /* The following functions do different service checks on the client.
1010          * The protocol is that they return non-zero if the client was
1011          * terminated. */
1012         if (clientsCronHandleTimeout(c,now)) continue;
1013         if (clientsCronResizeQueryBuffer(c)) continue;
1014     }
1015 }
1016 
1017 /* This function handles 'background' operations we are required to do
1018  * incrementally in Redis databases, such as active key expiring, resizing,
1019  * rehashing. */
databasesCron(void)1020 void databasesCron(void) {
1021     /* Expire keys by random sampling. Not required for slaves
1022      * as master will synthesize DELs for us. */
1023     if (server.active_expire_enabled && server.masterhost == NULL)
1024         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1025 
1026     /* Perform hash tables rehashing if needed, but only if there are no
1027      * other processes saving the DB on disk. Otherwise rehashing is bad
1028      * as will cause a lot of copy-on-write of memory pages. */
1029     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1030         /* We use global counters so if we stop the computation at a given
1031          * DB we'll be able to start from the successive in the next
1032          * cron loop iteration. */
1033         static unsigned int resize_db = 0;
1034         static unsigned int rehash_db = 0;
1035         int dbs_per_call = CRON_DBS_PER_CALL;
1036         int j;
1037 
1038         /* Don't test more DBs than we have. */
1039         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1040 
1041         /* Resize */
1042         for (j = 0; j < dbs_per_call; j++) {
1043             tryResizeHashTables(resize_db % server.dbnum);
1044             resize_db++;
1045         }
1046 
1047         /* Rehash */
1048         if (server.activerehashing) {
1049             for (j = 0; j < dbs_per_call; j++) {
1050                 int work_done = incrementallyRehash(rehash_db % server.dbnum);
1051                 rehash_db++;
1052                 if (work_done) {
1053                     /* If the function did some work, stop here, we'll do
1054                      * more at the next cron loop. */
1055                     break;
1056                 }
1057             }
1058         }
1059     }
1060 }
1061 
1062 /* We take a cached value of the unix time in the global state because with
1063  * virtual memory and aging there is to store the current time in objects at
1064  * every object access, and accuracy is not needed. To access a global var is
1065  * a lot faster than calling time(NULL) */
updateCachedTime(void)1066 void updateCachedTime(void) {
1067     server.unixtime = time(NULL);
1068     server.mstime = mstime();
1069 }
1070 
1071 /* This is our timer interrupt, called server.hz times per second.
1072  * Here is where we do a number of things that need to be done asynchronously.
1073  * For instance:
1074  *
1075  * - Active expired keys collection (it is also performed in a lazy way on
1076  *   lookup).
1077  * - Software watchdog.
1078  * - Update some statistic.
1079  * - Incremental rehashing of the DBs hash tables.
1080  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1081  * - Clients timeout of different kinds.
1082  * - Replication reconnection.
1083  * - Many more...
1084  *
1085  * Everything directly called here will be called server.hz times per second,
1086  * so in order to throttle execution of things we want to do less frequently
1087  * a macro is used: run_with_period(milliseconds) { .... }
1088  */
1089 
serverCron(struct aeEventLoop * eventLoop,long long id,void * clientData)1090 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1091     int j;
1092     UNUSED(eventLoop);
1093     UNUSED(id);
1094     UNUSED(clientData);
1095 
1096     /* Software watchdog: deliver the SIGALRM that will reach the signal
1097      * handler if we don't return here fast enough. */
1098     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1099 
1100     /* Update the time cache. */
1101     updateCachedTime();
1102 
1103     run_with_period(100) {
1104         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1105         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1106                 server.stat_net_input_bytes);
1107         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1108                 server.stat_net_output_bytes);
1109     }
1110 
1111     /* We have just LRU_BITS bits per object for LRU information.
1112      * So we use an (eventually wrapping) LRU clock.
1113      *
1114      * Note that even if the counter wraps it's not a big problem,
1115      * everything will still work but some object will appear younger
1116      * to Redis. However for this to happen a given object should never be
1117      * touched for all the time needed to the counter to wrap, which is
1118      * not likely.
1119      *
1120      * Note that you can change the resolution altering the
1121      * LRU_CLOCK_RESOLUTION define. */
1122     server.lruclock = getLRUClock();
1123 
1124     /* Record the max memory used since the server was started. */
1125     if (zmalloc_used_memory() > server.stat_peak_memory)
1126         server.stat_peak_memory = zmalloc_used_memory();
1127 
1128     /* Sample the RSS here since this is a relatively slow call. */
1129     server.resident_set_size = zmalloc_get_rss();
1130 
1131     /* We received a SIGTERM, shutting down here in a safe way, as it is
1132      * not ok doing so inside the signal handler. */
1133     if (server.shutdown_asap) {
1134         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1135         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1136         server.shutdown_asap = 0;
1137     }
1138 
1139     /* Show some info about non-empty databases */
1140     run_with_period(5000) {
1141         for (j = 0; j < server.dbnum; j++) {
1142             long long size, used, vkeys;
1143 
1144             size = dictSlots(server.db[j].dict);
1145             used = dictSize(server.db[j].dict);
1146             vkeys = dictSize(server.db[j].expires);
1147             if (used || vkeys) {
1148                 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1149                 /* dictPrintStats(server.dict); */
1150             }
1151         }
1152     }
1153 
1154     /* Show information about connected clients */
1155     if (!server.sentinel_mode) {
1156         run_with_period(5000) {
1157             serverLog(LL_VERBOSE,
1158                 "%lu clients connected (%lu slaves), %zu bytes in use",
1159                 listLength(server.clients)-listLength(server.slaves),
1160                 listLength(server.slaves),
1161                 zmalloc_used_memory());
1162         }
1163     }
1164 
1165     /* We need to do a few operations on clients asynchronously. */
1166     clientsCron();
1167 
1168     /* Handle background operations on Redis databases. */
1169     databasesCron();
1170 
1171     /* Start a scheduled AOF rewrite if this was requested by the user while
1172      * a BGSAVE was in progress. */
1173     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1174         server.aof_rewrite_scheduled)
1175     {
1176         rewriteAppendOnlyFileBackground();
1177     }
1178 
1179     /* Check if a background saving or AOF rewrite in progress terminated. */
1180     if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1181         ldbPendingChildren())
1182     {
1183         int statloc;
1184         pid_t pid;
1185 
1186         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1187             int exitcode = WEXITSTATUS(statloc);
1188             int bysignal = 0;
1189 
1190             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1191 
1192             if (pid == -1) {
1193                 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1194                     "rdb_child_pid = %d, aof_child_pid = %d",
1195                     strerror(errno),
1196                     (int) server.rdb_child_pid,
1197                     (int) server.aof_child_pid);
1198             } else if (pid == server.rdb_child_pid) {
1199                 backgroundSaveDoneHandler(exitcode,bysignal);
1200             } else if (pid == server.aof_child_pid) {
1201                 backgroundRewriteDoneHandler(exitcode,bysignal);
1202             } else {
1203                 if (!ldbRemoveChild(pid)) {
1204                     serverLog(LL_WARNING,
1205                         "Warning, detected child with unmatched pid: %ld",
1206                         (long)pid);
1207                 }
1208             }
1209             updateDictResizePolicy();
1210         }
1211     } else {
1212         /* If there is not a background saving/rewrite in progress check if
1213          * we have to save/rewrite now */
1214          for (j = 0; j < server.saveparamslen; j++) {
1215             struct saveparam *sp = server.saveparams+j;
1216 
1217             /* Save if we reached the given amount of changes,
1218              * the given amount of seconds, and if the latest bgsave was
1219              * successful or if, in case of an error, at least
1220              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1221             if (server.dirty >= sp->changes &&
1222                 server.unixtime-server.lastsave > sp->seconds &&
1223                 (server.unixtime-server.lastbgsave_try >
1224                  CONFIG_BGSAVE_RETRY_DELAY ||
1225                  server.lastbgsave_status == C_OK))
1226             {
1227                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1228                     sp->changes, (int)sp->seconds);
1229                 rdbSaveBackground(server.rdb_filename);
1230                 break;
1231             }
1232          }
1233 
1234          /* Trigger an AOF rewrite if needed */
1235          if (server.rdb_child_pid == -1 &&
1236              server.aof_child_pid == -1 &&
1237              server.aof_rewrite_perc &&
1238              server.aof_current_size > server.aof_rewrite_min_size)
1239          {
1240             long long base = server.aof_rewrite_base_size ?
1241                             server.aof_rewrite_base_size : 1;
1242             long long growth = (server.aof_current_size*100/base) - 100;
1243             if (growth >= server.aof_rewrite_perc) {
1244                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1245                 rewriteAppendOnlyFileBackground();
1246             }
1247          }
1248     }
1249 
1250 
1251     /* AOF postponed flush: Try at every cron cycle if the slow fsync
1252      * completed. */
1253     if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1254 
1255     /* AOF write errors: in this case we have a buffer to flush as well and
1256      * clear the AOF error in case of success to make the DB writable again,
1257      * however to try every second is enough in case of 'hz' is set to
1258      * an higher frequency. */
1259     run_with_period(1000) {
1260         if (server.aof_last_write_status == C_ERR)
1261             flushAppendOnlyFile(0);
1262     }
1263 
1264     /* Close clients that need to be closed asynchronous */
1265     freeClientsInAsyncFreeQueue();
1266 
1267     /* Clear the paused clients flag if needed. */
1268     clientsArePaused(); /* Don't check return value, just use the side effect. */
1269 
1270     /* Replication cron function -- used to reconnect to master,
1271      * detect transfer failures, start background RDB transfers and so forth. */
1272     run_with_period(1000) replicationCron();
1273 
1274     /* Run the Redis Cluster cron. */
1275     run_with_period(100) {
1276         if (server.cluster_enabled) clusterCron();
1277     }
1278 
1279     /* Run the Sentinel timer if we are in sentinel mode. */
1280     run_with_period(100) {
1281         if (server.sentinel_mode) sentinelTimer();
1282     }
1283 
1284     /* Cleanup expired MIGRATE cached sockets. */
1285     run_with_period(1000) {
1286         migrateCloseTimedoutSockets();
1287     }
1288 
1289     /* Start a scheduled BGSAVE if the corresponding flag is set. This is
1290      * useful when we are forced to postpone a BGSAVE because an AOF
1291      * rewrite is in progress.
1292      *
1293      * Note: this code must be after the replicationCron() call above so
1294      * make sure when refactoring this file to keep this order. This is useful
1295      * because we want to give priority to RDB savings for replication. */
1296     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1297         server.rdb_bgsave_scheduled &&
1298         (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
1299          server.lastbgsave_status == C_OK))
1300     {
1301         if (rdbSaveBackground(server.rdb_filename) == C_OK)
1302             server.rdb_bgsave_scheduled = 0;
1303     }
1304 
1305     server.cronloops++;
1306     return 1000/server.hz;
1307 }
1308 
1309 /* This function gets called every time Redis is entering the
1310  * main loop of the event driven library, that is, before to sleep
1311  * for ready file descriptors. */
beforeSleep(struct aeEventLoop * eventLoop)1312 void beforeSleep(struct aeEventLoop *eventLoop) {
1313     UNUSED(eventLoop);
1314 
1315     /* Call the Redis Cluster before sleep function. Note that this function
1316      * may change the state of Redis Cluster (from ok to fail or vice versa),
1317      * so it's a good idea to call it before serving the unblocked clients
1318      * later in this function. */
1319     if (server.cluster_enabled) clusterBeforeSleep();
1320 
1321     /* Run a fast expire cycle (the called function will return
1322      * ASAP if a fast cycle is not needed). */
1323     if (server.active_expire_enabled && server.masterhost == NULL)
1324         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1325 
1326     /* Send all the slaves an ACK request if at least one client blocked
1327      * during the previous event loop iteration. */
1328     if (server.get_ack_from_slaves) {
1329         robj *argv[3];
1330 
1331         argv[0] = createStringObject("REPLCONF",8);
1332         argv[1] = createStringObject("GETACK",6);
1333         argv[2] = createStringObject("*",1); /* Not used argument. */
1334         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1335         decrRefCount(argv[0]);
1336         decrRefCount(argv[1]);
1337         decrRefCount(argv[2]);
1338         server.get_ack_from_slaves = 0;
1339     }
1340 
1341     /* Unblock all the clients blocked for synchronous replication
1342      * in WAIT. */
1343     if (listLength(server.clients_waiting_acks))
1344         processClientsWaitingReplicas();
1345 
1346     /* Try to process pending commands for clients that were just unblocked. */
1347     if (listLength(server.unblocked_clients))
1348         processUnblockedClients();
1349 
1350     /* Write the AOF buffer on disk */
1351     flushAppendOnlyFile(0);
1352 
1353     /* Handle writes with pending output buffers. */
1354     handleClientsWithPendingWrites();
1355 }
1356 
1357 /* =========================== Server initialization ======================== */
1358 
createSharedObjects(void)1359 void createSharedObjects(void) {
1360     int j;
1361 
1362     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1363     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1364     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1365     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1366     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1367     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1368     shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1369     shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1370     shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1371     shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1372     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1373     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1374     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1375     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1376         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1377     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1378         "-ERR no such key\r\n"));
1379     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1380         "-ERR syntax error\r\n"));
1381     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1382         "-ERR source and destination objects are the same\r\n"));
1383     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1384         "-ERR index out of range\r\n"));
1385     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1386         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1387     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1388         "-LOADING Redis is loading the dataset in memory\r\n"));
1389     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1390         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1391     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1392         "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
1393     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1394         "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
1395     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1396         "-READONLY You can't write against a read only slave.\r\n"));
1397     shared.noautherr = createObject(OBJ_STRING,sdsnew(
1398         "-NOAUTH Authentication required.\r\n"));
1399     shared.oomerr = createObject(OBJ_STRING,sdsnew(
1400         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1401     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1402         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1403     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1404         "-NOREPLICAS Not enough good slaves to write.\r\n"));
1405     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1406         "-BUSYKEY Target key name already exists.\r\n"));
1407     shared.space = createObject(OBJ_STRING,sdsnew(" "));
1408     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1409     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1410 
1411     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1412         char dictid_str[64];
1413         int dictid_len;
1414 
1415         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1416         shared.select[j] = createObject(OBJ_STRING,
1417             sdscatprintf(sdsempty(),
1418                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1419                 dictid_len, dictid_str));
1420     }
1421     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1422     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1423     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1424     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1425     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1426     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1427     shared.del = createStringObject("DEL",3);
1428     shared.rpop = createStringObject("RPOP",4);
1429     shared.lpop = createStringObject("LPOP",4);
1430     shared.lpush = createStringObject("LPUSH",5);
1431     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1432         shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j);
1433         shared.integers[j]->encoding = OBJ_ENCODING_INT;
1434     }
1435     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1436         shared.mbulkhdr[j] = createObject(OBJ_STRING,
1437             sdscatprintf(sdsempty(),"*%d\r\n",j));
1438         shared.bulkhdr[j] = createObject(OBJ_STRING,
1439             sdscatprintf(sdsempty(),"$%d\r\n",j));
1440     }
1441     /* The following two shared objects, minstring and maxstrings, are not
1442      * actually used for their value but as a special object meaning
1443      * respectively the minimum possible string and the maximum possible
1444      * string in string comparisons for the ZRANGEBYLEX command. */
1445     shared.minstring = createStringObject("minstring",9);
1446     shared.maxstring = createStringObject("maxstring",9);
1447 }
1448 
initServerConfig(void)1449 void initServerConfig(void) {
1450     int j;
1451 
1452     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1453     server.configfile = NULL;
1454     server.executable = NULL;
1455     server.hz = CONFIG_DEFAULT_HZ;
1456     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1457     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1458     server.port = CONFIG_DEFAULT_SERVER_PORT;
1459     server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1460     server.bindaddr_count = 0;
1461     server.unixsocket = NULL;
1462     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1463     server.ipfd_count = 0;
1464     server.sofd = -1;
1465     server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
1466     server.dbnum = CONFIG_DEFAULT_DBNUM;
1467     server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1468     server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1469     server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1470     server.active_expire_enabled = 1;
1471     server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1472     server.saveparams = NULL;
1473     server.loading = 0;
1474     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1475     server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1476     server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1477     server.syslog_facility = LOG_LOCAL0;
1478     server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1479     server.supervised = 0;
1480     server.supervised_mode = SUPERVISED_NONE;
1481     server.aof_state = AOF_OFF;
1482     server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1483     server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1484     server.aof_rewrite_perc = AOF_REWRITE_PERC;
1485     server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1486     server.aof_rewrite_base_size = 0;
1487     server.aof_rewrite_scheduled = 0;
1488     server.aof_last_fsync = time(NULL);
1489     server.aof_rewrite_time_last = -1;
1490     server.aof_rewrite_time_start = -1;
1491     server.aof_lastbgrewrite_status = C_OK;
1492     server.aof_delayed_fsync = 0;
1493     server.aof_fd = -1;
1494     server.aof_selected_db = -1; /* Make sure the first time will not match */
1495     server.aof_flush_postponed_start = 0;
1496     server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1497     server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1498     server.pidfile = NULL;
1499     server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1500     server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1501     server.requirepass = NULL;
1502     server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1503     server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1504     server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1505     server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1506     server.notify_keyspace_events = 0;
1507     server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1508     server.bpop_blocked_clients = 0;
1509     server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1510     server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1511     server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1512     server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1513     server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1514     server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1515     server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1516     server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1517     server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1518     server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1519     server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1520     server.shutdown_asap = 0;
1521     server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1522     server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1523     server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1524     server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1525     server.cluster_enabled = 0;
1526     server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1527     server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1528     server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1529     server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1530     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1531     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1532     server.next_client_id = 1; /* Client IDs, start from 1 .*/
1533     server.loading_process_events_interval_bytes = (1024*1024*2);
1534 
1535     server.lruclock = getLRUClock();
1536     resetServerSaveParams();
1537 
1538     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
1539     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
1540     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1541     /* Replication related */
1542     server.masterauth = NULL;
1543     server.masterhost = NULL;
1544     server.masterport = 6379;
1545     server.master = NULL;
1546     server.cached_master = NULL;
1547     server.repl_master_initial_offset = -1;
1548     server.repl_state = REPL_STATE_NONE;
1549     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1550     server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1551     server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1552     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1553     server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1554     server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1555     server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1556     server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1557     server.slave_announce_ip = CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP;
1558     server.slave_announce_port = CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT;
1559     server.master_repl_offset = 0;
1560 
1561     /* Replication partial resync backlog */
1562     server.repl_backlog = NULL;
1563     server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1564     server.repl_backlog_histlen = 0;
1565     server.repl_backlog_idx = 0;
1566     server.repl_backlog_off = 0;
1567     server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1568     server.repl_no_slaves_since = time(NULL);
1569 
1570     /* Client output buffer limits */
1571     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1572         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1573 
1574     /* Double constants initialization */
1575     R_Zero = 0.0;
1576     R_PosInf = 1.0/R_Zero;
1577     R_NegInf = -1.0/R_Zero;
1578     R_Nan = R_Zero/R_Zero;
1579 
1580     /* Command table -- we initiialize it here as it is part of the
1581      * initial configuration, since command names may be changed via
1582      * redis.conf using the rename-command directive. */
1583     server.commands = dictCreate(&commandTableDictType,NULL);
1584     server.orig_commands = dictCreate(&commandTableDictType,NULL);
1585     populateCommandTable();
1586     server.delCommand = lookupCommandByCString("del");
1587     server.multiCommand = lookupCommandByCString("multi");
1588     server.lpushCommand = lookupCommandByCString("lpush");
1589     server.lpopCommand = lookupCommandByCString("lpop");
1590     server.rpopCommand = lookupCommandByCString("rpop");
1591     server.sremCommand = lookupCommandByCString("srem");
1592     server.execCommand = lookupCommandByCString("exec");
1593 
1594     /* Slow log */
1595     server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1596     server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1597 
1598     /* Latency monitor */
1599     server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1600 
1601     /* Debugging */
1602     server.assert_failed = "<no assertion failed>";
1603     server.assert_file = "<no file>";
1604     server.assert_line = 0;
1605     server.bug_report_start = 0;
1606     server.watchdog_period = 0;
1607 }
1608 
1609 extern char **environ;
1610 
1611 /* Restart the server, executing the same executable that started this
1612  * instance, with the same arguments and configuration file.
1613  *
1614  * The function is designed to directly call execve() so that the new
1615  * server instance will retain the PID of the previous one.
1616  *
1617  * The list of flags, that may be bitwise ORed together, alter the
1618  * behavior of this function:
1619  *
1620  * RESTART_SERVER_NONE              No flags.
1621  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
1622  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
1623  *
1624  * On success the function does not return, because the process turns into
1625  * a different process. On error C_ERR is returned. */
restartServer(int flags,mstime_t delay)1626 int restartServer(int flags, mstime_t delay) {
1627     int j;
1628 
1629     /* Check if we still have accesses to the executable that started this
1630      * server instance. */
1631     if (access(server.executable,X_OK) == -1) return C_ERR;
1632 
1633     /* Config rewriting. */
1634     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1635         server.configfile &&
1636         rewriteConfig(server.configfile) == -1) return C_ERR;
1637 
1638     /* Perform a proper shutdown. */
1639     if (flags & RESTART_SERVER_GRACEFULLY &&
1640         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR;
1641 
1642     /* Close all file descriptors, with the exception of stdin, stdout, strerr
1643      * which are useful if we restart a Redis server which is not daemonized. */
1644     for (j = 3; j < (int)server.maxclients + 1024; j++) close(j);
1645 
1646     /* Execute the server with the original command line. */
1647     if (delay) usleep(delay*1000);
1648     execve(server.executable,server.exec_argv,environ);
1649 
1650     /* If an error occurred here, there is nothing we can do, but exit. */
1651     _exit(1);
1652 
1653     return C_ERR; /* Never reached. */
1654 }
1655 
1656 /* This function will try to raise the max number of open files accordingly to
1657  * the configured max number of clients. It also reserves a number of file
1658  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1659  * persistence, listening sockets, log files and so forth.
1660  *
1661  * If it will not be possible to set the limit accordingly to the configured
1662  * max number of clients, the function will do the reverse setting
1663  * server.maxclients to the value that we can actually handle. */
adjustOpenFilesLimit(void)1664 void adjustOpenFilesLimit(void) {
1665     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1666     struct rlimit limit;
1667 
1668     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1669         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1670             strerror(errno));
1671         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1672     } else {
1673         rlim_t oldlimit = limit.rlim_cur;
1674 
1675         /* Set the max number of files if the current limit is not enough
1676          * for our needs. */
1677         if (oldlimit < maxfiles) {
1678             rlim_t bestlimit;
1679             int setrlimit_error = 0;
1680 
1681             /* Try to set the file limit to match 'maxfiles' or at least
1682              * to the higher value supported less than maxfiles. */
1683             bestlimit = maxfiles;
1684             while(bestlimit > oldlimit) {
1685                 rlim_t decr_step = 16;
1686 
1687                 limit.rlim_cur = bestlimit;
1688                 limit.rlim_max = bestlimit;
1689                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1690                 setrlimit_error = errno;
1691 
1692                 /* We failed to set file limit to 'bestlimit'. Try with a
1693                  * smaller limit decrementing by a few FDs per iteration. */
1694                 if (bestlimit < decr_step) break;
1695                 bestlimit -= decr_step;
1696             }
1697 
1698             /* Assume that the limit we get initially is still valid if
1699              * our last try was even lower. */
1700             if (bestlimit < oldlimit) bestlimit = oldlimit;
1701 
1702             if (bestlimit < maxfiles) {
1703                 int old_maxclients = server.maxclients;
1704                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1705                 if (server.maxclients < 1) {
1706                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
1707                         "of %llu is not enough for the server to start. "
1708                         "Please increase your open file limit to at least "
1709                         "%llu. Exiting.",
1710                         (unsigned long long) oldlimit,
1711                         (unsigned long long) maxfiles);
1712                     exit(1);
1713                 }
1714                 serverLog(LL_WARNING,"You requested maxclients of %d "
1715                     "requiring at least %llu max file descriptors.",
1716                     old_maxclients,
1717                     (unsigned long long) maxfiles);
1718                 serverLog(LL_WARNING,"Server can't set maximum open files "
1719                     "to %llu because of OS error: %s.",
1720                     (unsigned long long) maxfiles, strerror(setrlimit_error));
1721                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1722                     "maxclients has been reduced to %d to compensate for "
1723                     "low ulimit. "
1724                     "If you need higher maxclients increase 'ulimit -n'.",
1725                     (unsigned long long) bestlimit, server.maxclients);
1726             } else {
1727                 serverLog(LL_NOTICE,"Increased maximum number of open files "
1728                     "to %llu (it was originally set to %llu).",
1729                     (unsigned long long) maxfiles,
1730                     (unsigned long long) oldlimit);
1731             }
1732         }
1733     }
1734 }
1735 
1736 /* Check that server.tcp_backlog can be actually enforced in Linux according
1737  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
checkTcpBacklogSettings(void)1738 void checkTcpBacklogSettings(void) {
1739 #ifdef HAVE_PROC_SOMAXCONN
1740     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1741     char buf[1024];
1742     if (!fp) return;
1743     if (fgets(buf,sizeof(buf),fp) != NULL) {
1744         int somaxconn = atoi(buf);
1745         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1746             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1747         }
1748     }
1749     fclose(fp);
1750 #endif
1751 }
1752 
1753 /* Initialize a set of file descriptors to listen to the specified 'port'
1754  * binding the addresses specified in the Redis server configuration.
1755  *
1756  * The listening file descriptors are stored in the integer array 'fds'
1757  * and their number is set in '*count'.
1758  *
1759  * The addresses to bind are specified in the global server.bindaddr array
1760  * and their number is server.bindaddr_count. If the server configuration
1761  * contains no specific addresses to bind, this function will try to
1762  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1763  *
1764  * On success the function returns C_OK.
1765  *
1766  * On error the function returns C_ERR. For the function to be on
1767  * error, at least one of the server.bindaddr addresses was
1768  * impossible to bind, or no bind addresses were specified in the server
1769  * configuration but the function is not able to bind * for at least
1770  * one of the IPv4 or IPv6 protocols. */
listenToPort(int port,int * fds,int * count)1771 int listenToPort(int port, int *fds, int *count) {
1772     int j;
1773 
1774     /* Force binding of 0.0.0.0 if no bind address is specified, always
1775      * entering the loop if j == 0. */
1776     if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1777     for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1778         if (server.bindaddr[j] == NULL) {
1779             /* Bind * for both IPv6 and IPv4, we enter here only if
1780              * server.bindaddr_count == 0. */
1781             fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1782                 server.tcp_backlog);
1783             if (fds[*count] != ANET_ERR) {
1784                 anetNonBlock(NULL,fds[*count]);
1785                 (*count)++;
1786 
1787                 /* Bind the IPv4 address as well. */
1788                 fds[*count] = anetTcpServer(server.neterr,port,NULL,
1789                     server.tcp_backlog);
1790                 if (fds[*count] != ANET_ERR) {
1791                     anetNonBlock(NULL,fds[*count]);
1792                     (*count)++;
1793                 }
1794             }
1795             /* Exit the loop if we were able to bind * on IPv4 and IPv6,
1796              * otherwise fds[*count] will be ANET_ERR and we'll print an
1797              * error and return to the caller with an error. */
1798             if (*count == 2) break;
1799         } else if (strchr(server.bindaddr[j],':')) {
1800             /* Bind IPv6 address. */
1801             fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1802                 server.tcp_backlog);
1803         } else {
1804             /* Bind IPv4 address. */
1805             fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1806                 server.tcp_backlog);
1807         }
1808         if (fds[*count] == ANET_ERR) {
1809             serverLog(LL_WARNING,
1810                 "Creating Server TCP listening socket %s:%d: %s",
1811                 server.bindaddr[j] ? server.bindaddr[j] : "*",
1812                 port, server.neterr);
1813             return C_ERR;
1814         }
1815         anetNonBlock(NULL,fds[*count]);
1816         (*count)++;
1817     }
1818     return C_OK;
1819 }
1820 
1821 /* Resets the stats that we expose via INFO or other means that we want
1822  * to reset via CONFIG RESETSTAT. The function is also used in order to
1823  * initialize these fields in initServer() at server startup. */
resetServerStats(void)1824 void resetServerStats(void) {
1825     int j;
1826 
1827     server.stat_numcommands = 0;
1828     server.stat_numconnections = 0;
1829     server.stat_expiredkeys = 0;
1830     server.stat_evictedkeys = 0;
1831     server.stat_keyspace_misses = 0;
1832     server.stat_keyspace_hits = 0;
1833     server.stat_fork_time = 0;
1834     server.stat_fork_rate = 0;
1835     server.stat_rejected_conn = 0;
1836     server.stat_sync_full = 0;
1837     server.stat_sync_partial_ok = 0;
1838     server.stat_sync_partial_err = 0;
1839     for (j = 0; j < STATS_METRIC_COUNT; j++) {
1840         server.inst_metric[j].idx = 0;
1841         server.inst_metric[j].last_sample_time = mstime();
1842         server.inst_metric[j].last_sample_count = 0;
1843         memset(server.inst_metric[j].samples,0,
1844             sizeof(server.inst_metric[j].samples));
1845     }
1846     server.stat_net_input_bytes = 0;
1847     server.stat_net_output_bytes = 0;
1848     server.aof_delayed_fsync = 0;
1849 }
1850 
initServer(void)1851 void initServer(void) {
1852     int j;
1853 
1854     signal(SIGHUP, SIG_IGN);
1855     signal(SIGPIPE, SIG_IGN);
1856     setupSignalHandlers();
1857 
1858     if (server.syslog_enabled) {
1859         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
1860             server.syslog_facility);
1861     }
1862 
1863     server.pid = getpid();
1864     server.current_client = NULL;
1865     server.clients = listCreate();
1866     server.clients_to_close = listCreate();
1867     server.slaves = listCreate();
1868     server.monitors = listCreate();
1869     server.clients_pending_write = listCreate();
1870     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
1871     server.unblocked_clients = listCreate();
1872     server.ready_keys = listCreate();
1873     server.clients_waiting_acks = listCreate();
1874     server.get_ack_from_slaves = 0;
1875     server.clients_paused = 0;
1876     server.system_memory_size = zmalloc_get_memory_size();
1877 
1878     createSharedObjects();
1879     adjustOpenFilesLimit();
1880     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
1881     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1882 
1883     /* Open the TCP listening socket for the user commands. */
1884     if (server.port != 0 &&
1885         listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
1886         exit(1);
1887 
1888     /* Open the listening Unix domain socket. */
1889     if (server.unixsocket != NULL) {
1890         unlink(server.unixsocket); /* don't care if this fails */
1891         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
1892             server.unixsocketperm, server.tcp_backlog);
1893         if (server.sofd == ANET_ERR) {
1894             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
1895             exit(1);
1896         }
1897         anetNonBlock(NULL,server.sofd);
1898     }
1899 
1900     /* Abort if there are no listening sockets at all. */
1901     if (server.ipfd_count == 0 && server.sofd < 0) {
1902         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
1903         exit(1);
1904     }
1905 
1906     /* Create the Redis databases, and initialize other internal state. */
1907     for (j = 0; j < server.dbnum; j++) {
1908         server.db[j].dict = dictCreate(&dbDictType,NULL);
1909         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
1910         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
1911         server.db[j].ready_keys = dictCreate(&setDictType,NULL);
1912         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
1913         server.db[j].eviction_pool = evictionPoolAlloc();
1914         server.db[j].id = j;
1915         server.db[j].avg_ttl = 0;
1916     }
1917     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
1918     server.pubsub_patterns = listCreate();
1919     listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
1920     listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
1921     server.cronloops = 0;
1922     server.rdb_child_pid = -1;
1923     server.aof_child_pid = -1;
1924     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1925     server.rdb_bgsave_scheduled = 0;
1926     aofRewriteBufferReset();
1927     server.aof_buf = sdsempty();
1928     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
1929     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
1930     server.rdb_save_time_last = -1;
1931     server.rdb_save_time_start = -1;
1932     server.dirty = 0;
1933     resetServerStats();
1934     /* A few stats we don't want to reset: server startup time, and peak mem. */
1935     server.stat_starttime = time(NULL);
1936     server.stat_peak_memory = 0;
1937     server.resident_set_size = 0;
1938     server.lastbgsave_status = C_OK;
1939     server.aof_last_write_status = C_OK;
1940     server.aof_last_write_errno = 0;
1941     server.repl_good_slaves_count = 0;
1942     updateCachedTime();
1943 
1944     /* Create the serverCron() time event, that's our main way to process
1945      * background operations. */
1946     if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
1947         serverPanic("Can't create the serverCron time event.");
1948         exit(1);
1949     }
1950 
1951     /* Create an event handler for accepting new connections in TCP and Unix
1952      * domain sockets. */
1953     for (j = 0; j < server.ipfd_count; j++) {
1954         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
1955             acceptTcpHandler,NULL) == AE_ERR)
1956             {
1957                 serverPanic(
1958                     "Unrecoverable error creating server.ipfd file event.");
1959             }
1960     }
1961     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
1962         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
1963 
1964     /* Open the AOF file if needed. */
1965     if (server.aof_state == AOF_ON) {
1966         server.aof_fd = open(server.aof_filename,
1967                                O_WRONLY|O_APPEND|O_CREAT,0644);
1968         if (server.aof_fd == -1) {
1969             serverLog(LL_WARNING, "Can't open the append-only file: %s",
1970                 strerror(errno));
1971             exit(1);
1972         }
1973     }
1974 
1975     /* 32 bit instances are limited to 4GB of address space, so if there is
1976      * no explicit limit in the user provided configuration we set a limit
1977      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
1978      * useless crashes of the Redis instance for out of memory. */
1979     if (server.arch_bits == 32 && server.maxmemory == 0) {
1980         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
1981         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
1982         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
1983     }
1984 
1985     if (server.cluster_enabled) clusterInit();
1986     replicationScriptCacheInit();
1987     scriptingInit(1);
1988     slowlogInit();
1989     latencyMonitorInit();
1990     bioInit();
1991 }
1992 
1993 /* Populates the Redis Command Table starting from the hard coded list
1994  * we have on top of redis.c file. */
populateCommandTable(void)1995 void populateCommandTable(void) {
1996     int j;
1997     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
1998 
1999     for (j = 0; j < numcommands; j++) {
2000         struct redisCommand *c = redisCommandTable+j;
2001         char *f = c->sflags;
2002         int retval1, retval2;
2003 
2004         while(*f != '\0') {
2005             switch(*f) {
2006             case 'w': c->flags |= CMD_WRITE; break;
2007             case 'r': c->flags |= CMD_READONLY; break;
2008             case 'm': c->flags |= CMD_DENYOOM; break;
2009             case 'a': c->flags |= CMD_ADMIN; break;
2010             case 'p': c->flags |= CMD_PUBSUB; break;
2011             case 's': c->flags |= CMD_NOSCRIPT; break;
2012             case 'R': c->flags |= CMD_RANDOM; break;
2013             case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
2014             case 'l': c->flags |= CMD_LOADING; break;
2015             case 't': c->flags |= CMD_STALE; break;
2016             case 'M': c->flags |= CMD_SKIP_MONITOR; break;
2017             case 'k': c->flags |= CMD_ASKING; break;
2018             case 'F': c->flags |= CMD_FAST; break;
2019             default: serverPanic("Unsupported command flag"); break;
2020             }
2021             f++;
2022         }
2023 
2024         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
2025         /* Populate an additional dictionary that will be unaffected
2026          * by rename-command statements in redis.conf. */
2027         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2028         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2029     }
2030 }
2031 
resetCommandTableStats(void)2032 void resetCommandTableStats(void) {
2033     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2034     int j;
2035 
2036     for (j = 0; j < numcommands; j++) {
2037         struct redisCommand *c = redisCommandTable+j;
2038 
2039         c->microseconds = 0;
2040         c->calls = 0;
2041     }
2042 }
2043 
2044 /* ========================== Redis OP Array API ============================ */
2045 
redisOpArrayInit(redisOpArray * oa)2046 void redisOpArrayInit(redisOpArray *oa) {
2047     oa->ops = NULL;
2048     oa->numops = 0;
2049 }
2050 
redisOpArrayAppend(redisOpArray * oa,struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2051 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2052                        robj **argv, int argc, int target)
2053 {
2054     redisOp *op;
2055 
2056     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2057     op = oa->ops+oa->numops;
2058     op->cmd = cmd;
2059     op->dbid = dbid;
2060     op->argv = argv;
2061     op->argc = argc;
2062     op->target = target;
2063     oa->numops++;
2064     return oa->numops;
2065 }
2066 
redisOpArrayFree(redisOpArray * oa)2067 void redisOpArrayFree(redisOpArray *oa) {
2068     while(oa->numops) {
2069         int j;
2070         redisOp *op;
2071 
2072         oa->numops--;
2073         op = oa->ops+oa->numops;
2074         for (j = 0; j < op->argc; j++)
2075             decrRefCount(op->argv[j]);
2076         zfree(op->argv);
2077     }
2078     zfree(oa->ops);
2079 }
2080 
2081 /* ====================== Commands lookup and execution ===================== */
2082 
lookupCommand(sds name)2083 struct redisCommand *lookupCommand(sds name) {
2084     return dictFetchValue(server.commands, name);
2085 }
2086 
lookupCommandByCString(char * s)2087 struct redisCommand *lookupCommandByCString(char *s) {
2088     struct redisCommand *cmd;
2089     sds name = sdsnew(s);
2090 
2091     cmd = dictFetchValue(server.commands, name);
2092     sdsfree(name);
2093     return cmd;
2094 }
2095 
2096 /* Lookup the command in the current table, if not found also check in
2097  * the original table containing the original command names unaffected by
2098  * redis.conf rename-command statement.
2099  *
2100  * This is used by functions rewriting the argument vector such as
2101  * rewriteClientCommandVector() in order to set client->cmd pointer
2102  * correctly even if the command was renamed. */
lookupCommandOrOriginal(sds name)2103 struct redisCommand *lookupCommandOrOriginal(sds name) {
2104     struct redisCommand *cmd = dictFetchValue(server.commands, name);
2105 
2106     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2107     return cmd;
2108 }
2109 
2110 /* Propagate the specified command (in the context of the specified database id)
2111  * to AOF and Slaves.
2112  *
2113  * flags are an xor between:
2114  * + PROPAGATE_NONE (no propagation of command at all)
2115  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2116  * + PROPAGATE_REPL (propagate into the replication link)
2117  *
2118  * This should not be used inside commands implementation. Use instead
2119  * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2120  */
propagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int flags)2121 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2122                int flags)
2123 {
2124     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2125         feedAppendOnlyFile(cmd,dbid,argv,argc);
2126     if (flags & PROPAGATE_REPL)
2127         replicationFeedSlaves(server.slaves,dbid,argv,argc);
2128 }
2129 
2130 /* Used inside commands to schedule the propagation of additional commands
2131  * after the current command is propagated to AOF / Replication.
2132  *
2133  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2134  * database ID the command should be propagated into.
2135  * Arguments of the command to propagte are passed as an array of redis
2136  * objects pointers of len 'argc', using the 'argv' vector.
2137  *
2138  * The function does not take a reference to the passed 'argv' vector,
2139  * so it is up to the caller to release the passed argv (but it is usually
2140  * stack allocated).  The function autoamtically increments ref count of
2141  * passed objects, so the caller does not need to. */
alsoPropagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2142 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2143                    int target)
2144 {
2145     robj **argvcopy;
2146     int j;
2147 
2148     if (server.loading) return; /* No propagation during loading. */
2149 
2150     argvcopy = zmalloc(sizeof(robj*)*argc);
2151     for (j = 0; j < argc; j++) {
2152         argvcopy[j] = argv[j];
2153         incrRefCount(argv[j]);
2154     }
2155     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2156 }
2157 
2158 /* It is possible to call the function forceCommandPropagation() inside a
2159  * Redis command implementation in order to to force the propagation of a
2160  * specific command execution into AOF / Replication. */
forceCommandPropagation(client * c,int flags)2161 void forceCommandPropagation(client *c, int flags) {
2162     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2163     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2164 }
2165 
2166 /* Avoid that the executed command is propagated at all. This way we
2167  * are free to just propagate what we want using the alsoPropagate()
2168  * API. */
preventCommandPropagation(client * c)2169 void preventCommandPropagation(client *c) {
2170     c->flags |= CLIENT_PREVENT_PROP;
2171 }
2172 
2173 /* AOF specific version of preventCommandPropagation(). */
preventCommandAOF(client * c)2174 void preventCommandAOF(client *c) {
2175     c->flags |= CLIENT_PREVENT_AOF_PROP;
2176 }
2177 
2178 /* Replication specific version of preventCommandPropagation(). */
preventCommandReplication(client * c)2179 void preventCommandReplication(client *c) {
2180     c->flags |= CLIENT_PREVENT_REPL_PROP;
2181 }
2182 
2183 /* Call() is the core of Redis execution of a command.
2184  *
2185  * The following flags can be passed:
2186  * CMD_CALL_NONE        No flags.
2187  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
2188  * CMD_CALL_STATS       Populate command stats.
2189  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
2190  *                          or if the client flags are forcing propagation.
2191  * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
2192  *                          or if the client flags are forcing propagation.
2193  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2194  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
2195  *
2196  * The exact propagation behavior depends on the client flags.
2197  * Specifically:
2198  *
2199  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2200  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2201  *    in the call flags, then the command is propagated even if the
2202  *    dataset was not affected by the command.
2203  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2204  *    are set, the propagation into AOF or to slaves is not performed even
2205  *    if the command modified the dataset.
2206  *
2207  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2208  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2209  * slaves propagation will never occur.
2210  *
2211  * Client flags are modified by the implementation of a given command
2212  * using the following API:
2213  *
2214  * forceCommandPropagation(client *c, int flags);
2215  * preventCommandPropagation(client *c);
2216  * preventCommandAOF(client *c);
2217  * preventCommandReplication(client *c);
2218  *
2219  */
call(client * c,int flags)2220 void call(client *c, int flags) {
2221     long long dirty, start, duration;
2222     int client_old_flags = c->flags;
2223 
2224     /* Sent the command to clients in MONITOR mode, only if the commands are
2225      * not generated from reading an AOF. */
2226     if (listLength(server.monitors) &&
2227         !server.loading &&
2228         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2229     {
2230         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2231     }
2232 
2233     /* Initialization: clear the flags that must be set by the command on
2234      * demand, and initialize the array for additional commands propagation. */
2235     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2236     redisOpArrayInit(&server.also_propagate);
2237 
2238     /* Call the command. */
2239     dirty = server.dirty;
2240     start = ustime();
2241     c->cmd->proc(c);
2242     duration = ustime()-start;
2243     dirty = server.dirty-dirty;
2244     if (dirty < 0) dirty = 0;
2245 
2246     /* When EVAL is called loading the AOF we don't want commands called
2247      * from Lua to go into the slowlog or to populate statistics. */
2248     if (server.loading && c->flags & CLIENT_LUA)
2249         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2250 
2251     /* If the caller is Lua, we want to force the EVAL caller to propagate
2252      * the script if the command flag or client flag are forcing the
2253      * propagation. */
2254     if (c->flags & CLIENT_LUA && server.lua_caller) {
2255         if (c->flags & CLIENT_FORCE_REPL)
2256             server.lua_caller->flags |= CLIENT_FORCE_REPL;
2257         if (c->flags & CLIENT_FORCE_AOF)
2258             server.lua_caller->flags |= CLIENT_FORCE_AOF;
2259     }
2260 
2261     /* Log the command into the Slow log if needed, and populate the
2262      * per-command statistics that we show in INFO commandstats. */
2263     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2264         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2265                               "fast-command" : "command";
2266         latencyAddSampleIfNeeded(latency_event,duration/1000);
2267         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
2268     }
2269     if (flags & CMD_CALL_STATS) {
2270         c->lastcmd->microseconds += duration;
2271         c->lastcmd->calls++;
2272     }
2273 
2274     /* Propagate the command into the AOF and replication link */
2275     if (flags & CMD_CALL_PROPAGATE &&
2276         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2277     {
2278         int propagate_flags = PROPAGATE_NONE;
2279 
2280         /* Check if the command operated changes in the data set. If so
2281          * set for replication / AOF propagation. */
2282         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2283 
2284         /* If the client forced AOF / replication of the command, set
2285          * the flags regardless of the command effects on the data set. */
2286         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2287         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2288 
2289         /* However prevent AOF / replication propagation if the command
2290          * implementatino called preventCommandPropagation() or similar,
2291          * or if we don't have the call() flags to do so. */
2292         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2293             !(flags & CMD_CALL_PROPAGATE_REPL))
2294                 propagate_flags &= ~PROPAGATE_REPL;
2295         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2296             !(flags & CMD_CALL_PROPAGATE_AOF))
2297                 propagate_flags &= ~PROPAGATE_AOF;
2298 
2299         /* Call propagate() only if at least one of AOF / replication
2300          * propagation is needed. */
2301         if (propagate_flags != PROPAGATE_NONE)
2302             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2303     }
2304 
2305     /* Restore the old replication flags, since call() can be executed
2306      * recursively. */
2307     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2308     c->flags |= client_old_flags &
2309         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2310 
2311     /* Handle the alsoPropagate() API to handle commands that want to propagate
2312      * multiple separated commands. Note that alsoPropagate() is not affected
2313      * by CLIENT_PREVENT_PROP flag. */
2314     if (server.also_propagate.numops) {
2315         int j;
2316         redisOp *rop;
2317 
2318         if (flags & CMD_CALL_PROPAGATE) {
2319             for (j = 0; j < server.also_propagate.numops; j++) {
2320                 rop = &server.also_propagate.ops[j];
2321                 int target = rop->target;
2322                 /* Whatever the command wish is, we honor the call() flags. */
2323                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2324                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2325                 if (target)
2326                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2327             }
2328         }
2329         redisOpArrayFree(&server.also_propagate);
2330     }
2331     server.stat_numcommands++;
2332 }
2333 
2334 /* If this function gets called we already read a whole
2335  * command, arguments are in the client argv/argc fields.
2336  * processCommand() execute the command or prepare the
2337  * server for a bulk read from the client.
2338  *
2339  * If C_OK is returned the client is still alive and valid and
2340  * other operations can be performed by the caller. Otherwise
2341  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
processCommand(client * c)2342 int processCommand(client *c) {
2343     /* The QUIT command is handled separately. Normal command procs will
2344      * go through checking for replication and QUIT will cause trouble
2345      * when FORCE_REPLICATION is enabled and would be implemented in
2346      * a regular command proc. */
2347     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2348         addReply(c,shared.ok);
2349         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2350         return C_ERR;
2351     }
2352 
2353     /* Now lookup the command and check ASAP about trivial error conditions
2354      * such as wrong arity, bad command name and so forth. */
2355     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2356     if (!c->cmd) {
2357         flagTransaction(c);
2358         addReplyErrorFormat(c,"unknown command '%s'",
2359             (char*)c->argv[0]->ptr);
2360         return C_OK;
2361     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2362                (c->argc < -c->cmd->arity)) {
2363         flagTransaction(c);
2364         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2365             c->cmd->name);
2366         return C_OK;
2367     }
2368 
2369     /* Check if the user is authenticated */
2370     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2371     {
2372         flagTransaction(c);
2373         addReply(c,shared.noautherr);
2374         return C_OK;
2375     }
2376 
2377     /* If cluster is enabled perform the cluster redirection here.
2378      * However we don't perform the redirection if:
2379      * 1) The sender of this command is our master.
2380      * 2) The command has no key arguments. */
2381     if (server.cluster_enabled &&
2382         !(c->flags & CLIENT_MASTER) &&
2383         !(c->flags & CLIENT_LUA &&
2384           server.lua_caller->flags & CLIENT_MASTER) &&
2385         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2386           c->cmd->proc != execCommand))
2387     {
2388         int hashslot;
2389         int error_code;
2390         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2391                                         &hashslot,&error_code);
2392         if (n == NULL || n != server.cluster->myself) {
2393             if (c->cmd->proc == execCommand) {
2394                 discardTransaction(c);
2395             } else {
2396                 flagTransaction(c);
2397             }
2398             clusterRedirectClient(c,n,hashslot,error_code);
2399             return C_OK;
2400         }
2401     }
2402 
2403     /* Handle the maxmemory directive.
2404      *
2405      * First we try to free some memory if possible (if there are volatile
2406      * keys in the dataset). If there are not the only thing we can do
2407      * is returning an error. */
2408     if (server.maxmemory) {
2409         int retval = freeMemoryIfNeeded();
2410         /* freeMemoryIfNeeded may flush slave output buffers. This may result
2411          * into a slave, that may be the active client, to be freed. */
2412         if (server.current_client == NULL) return C_ERR;
2413 
2414         /* It was impossible to free enough memory, and the command the client
2415          * is trying to execute is denied during OOM conditions? Error. */
2416         if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
2417             flagTransaction(c);
2418             addReply(c, shared.oomerr);
2419             return C_OK;
2420         }
2421     }
2422 
2423     /* Don't accept write commands if there are problems persisting on disk
2424      * and if this is a master instance. */
2425     if (((server.stop_writes_on_bgsave_err &&
2426           server.saveparamslen > 0 &&
2427           server.lastbgsave_status == C_ERR) ||
2428           server.aof_last_write_status == C_ERR) &&
2429         server.masterhost == NULL &&
2430         (c->cmd->flags & CMD_WRITE ||
2431          c->cmd->proc == pingCommand))
2432     {
2433         flagTransaction(c);
2434         if (server.aof_last_write_status == C_OK)
2435             addReply(c, shared.bgsaveerr);
2436         else
2437             addReplySds(c,
2438                 sdscatprintf(sdsempty(),
2439                 "-MISCONF Errors writing to the AOF file: %s\r\n",
2440                 strerror(server.aof_last_write_errno)));
2441         return C_OK;
2442     }
2443 
2444     /* Don't accept write commands if there are not enough good slaves and
2445      * user configured the min-slaves-to-write option. */
2446     if (server.masterhost == NULL &&
2447         server.repl_min_slaves_to_write &&
2448         server.repl_min_slaves_max_lag &&
2449         c->cmd->flags & CMD_WRITE &&
2450         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2451     {
2452         flagTransaction(c);
2453         addReply(c, shared.noreplicaserr);
2454         return C_OK;
2455     }
2456 
2457     /* Don't accept write commands if this is a read only slave. But
2458      * accept write commands if this is our master. */
2459     if (server.masterhost && server.repl_slave_ro &&
2460         !(c->flags & CLIENT_MASTER) &&
2461         c->cmd->flags & CMD_WRITE)
2462     {
2463         addReply(c, shared.roslaveerr);
2464         return C_OK;
2465     }
2466 
2467     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2468     if (c->flags & CLIENT_PUBSUB &&
2469         c->cmd->proc != pingCommand &&
2470         c->cmd->proc != subscribeCommand &&
2471         c->cmd->proc != unsubscribeCommand &&
2472         c->cmd->proc != psubscribeCommand &&
2473         c->cmd->proc != punsubscribeCommand) {
2474         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2475         return C_OK;
2476     }
2477 
2478     /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
2479      * we are a slave with a broken link with master. */
2480     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2481         server.repl_serve_stale_data == 0 &&
2482         !(c->cmd->flags & CMD_STALE))
2483     {
2484         flagTransaction(c);
2485         addReply(c, shared.masterdownerr);
2486         return C_OK;
2487     }
2488 
2489     /* Loading DB? Return an error if the command has not the
2490      * CMD_LOADING flag. */
2491     if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2492         addReply(c, shared.loadingerr);
2493         return C_OK;
2494     }
2495 
2496     /* Lua script too slow? Only allow a limited number of commands. */
2497     if (server.lua_timedout &&
2498           c->cmd->proc != authCommand &&
2499           c->cmd->proc != replconfCommand &&
2500         !(c->cmd->proc == shutdownCommand &&
2501           c->argc == 2 &&
2502           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2503         !(c->cmd->proc == scriptCommand &&
2504           c->argc == 2 &&
2505           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2506     {
2507         flagTransaction(c);
2508         addReply(c, shared.slowscripterr);
2509         return C_OK;
2510     }
2511 
2512     /* Exec the command */
2513     if (c->flags & CLIENT_MULTI &&
2514         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2515         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2516     {
2517         queueMultiCommand(c);
2518         addReply(c,shared.queued);
2519     } else {
2520         call(c,CMD_CALL_FULL);
2521         c->woff = server.master_repl_offset;
2522         if (listLength(server.ready_keys))
2523             handleClientsBlockedOnLists();
2524     }
2525     return C_OK;
2526 }
2527 
2528 /*================================== Shutdown =============================== */
2529 
2530 /* Close listening sockets. Also unlink the unix domain socket if
2531  * unlink_unix_socket is non-zero. */
closeListeningSockets(int unlink_unix_socket)2532 void closeListeningSockets(int unlink_unix_socket) {
2533     int j;
2534 
2535     for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2536     if (server.sofd != -1) close(server.sofd);
2537     if (server.cluster_enabled)
2538         for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2539     if (unlink_unix_socket && server.unixsocket) {
2540         serverLog(LL_NOTICE,"Removing the unix socket file.");
2541         unlink(server.unixsocket); /* don't care if this fails */
2542     }
2543 }
2544 
prepareForShutdown(int flags)2545 int prepareForShutdown(int flags) {
2546     int save = flags & SHUTDOWN_SAVE;
2547     int nosave = flags & SHUTDOWN_NOSAVE;
2548 
2549     serverLog(LL_WARNING,"User requested shutdown...");
2550 
2551     /* Kill all the Lua debugger forked sessions. */
2552     ldbKillForkedSessions();
2553 
2554     /* Kill the saving child if there is a background saving in progress.
2555        We want to avoid race conditions, for instance our saving child may
2556        overwrite the synchronous saving did by SHUTDOWN. */
2557     if (server.rdb_child_pid != -1) {
2558         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2559         kill(server.rdb_child_pid,SIGUSR1);
2560         rdbRemoveTempFile(server.rdb_child_pid);
2561     }
2562 
2563     if (server.aof_state != AOF_OFF) {
2564         /* Kill the AOF saving child as the AOF we already have may be longer
2565          * but contains the full dataset anyway. */
2566         if (server.aof_child_pid != -1) {
2567             /* If we have AOF enabled but haven't written the AOF yet, don't
2568              * shutdown or else the dataset will be lost. */
2569             if (server.aof_state == AOF_WAIT_REWRITE) {
2570                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2571                 return C_ERR;
2572             }
2573             serverLog(LL_WARNING,
2574                 "There is a child rewriting the AOF. Killing it!");
2575             kill(server.aof_child_pid,SIGUSR1);
2576         }
2577         /* Append only file: fsync() the AOF and exit */
2578         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2579         aof_fsync(server.aof_fd);
2580     }
2581 
2582     /* Create a new RDB file before exiting. */
2583     if ((server.saveparamslen > 0 && !nosave) || save) {
2584         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2585         /* Snapshotting. Perform a SYNC SAVE and exit */
2586         if (rdbSave(server.rdb_filename) != C_OK) {
2587             /* Ooops.. error saving! The best we can do is to continue
2588              * operating. Note that if there was a background saving process,
2589              * in the next cron() Redis will be notified that the background
2590              * saving aborted, handling special stuff like slaves pending for
2591              * synchronization... */
2592             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2593             return C_ERR;
2594         }
2595     }
2596 
2597     /* Remove the pid file if possible and needed. */
2598     if (server.daemonize || server.pidfile) {
2599         serverLog(LL_NOTICE,"Removing the pid file.");
2600         unlink(server.pidfile);
2601     }
2602 
2603     /* Best effort flush of slave output buffers, so that we hopefully
2604      * send them pending writes. */
2605     flushSlavesOutputBuffers();
2606 
2607     /* Close the listening sockets. Apparently this allows faster restarts. */
2608     closeListeningSockets(1);
2609     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2610         server.sentinel_mode ? "Sentinel" : "Redis");
2611     return C_OK;
2612 }
2613 
2614 /*================================== Commands =============================== */
2615 
2616 /* Return zero if strings are the same, non-zero if they are not.
2617  * The comparison is performed in a way that prevents an attacker to obtain
2618  * information about the nature of the strings just monitoring the execution
2619  * time of the function.
2620  *
2621  * Note that limiting the comparison length to strings up to 512 bytes we
2622  * can avoid leaking any information about the password length and any
2623  * possible branch misprediction related leak.
2624  */
time_independent_strcmp(char * a,char * b)2625 int time_independent_strcmp(char *a, char *b) {
2626     char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2627     /* The above two strlen perform len(a) + len(b) operations where either
2628      * a or b are fixed (our password) length, and the difference is only
2629      * relative to the length of the user provided string, so no information
2630      * leak is possible in the following two lines of code. */
2631     unsigned int alen = strlen(a);
2632     unsigned int blen = strlen(b);
2633     unsigned int j;
2634     int diff = 0;
2635 
2636     /* We can't compare strings longer than our static buffers.
2637      * Note that this will never pass the first test in practical circumstances
2638      * so there is no info leak. */
2639     if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2640 
2641     memset(bufa,0,sizeof(bufa));        /* Constant time. */
2642     memset(bufb,0,sizeof(bufb));        /* Constant time. */
2643     /* Again the time of the following two copies is proportional to
2644      * len(a) + len(b) so no info is leaked. */
2645     memcpy(bufa,a,alen);
2646     memcpy(bufb,b,blen);
2647 
2648     /* Always compare all the chars in the two buffers without
2649      * conditional expressions. */
2650     for (j = 0; j < sizeof(bufa); j++) {
2651         diff |= (bufa[j] ^ bufb[j]);
2652     }
2653     /* Length must be equal as well. */
2654     diff |= alen ^ blen;
2655     return diff; /* If zero strings are the same. */
2656 }
2657 
authCommand(client * c)2658 void authCommand(client *c) {
2659     if (!server.requirepass) {
2660         addReplyError(c,"Client sent AUTH, but no password is set");
2661     } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2662       c->authenticated = 1;
2663       addReply(c,shared.ok);
2664     } else {
2665       c->authenticated = 0;
2666       addReplyError(c,"invalid password");
2667     }
2668 }
2669 
2670 /* The PING command. It works in a different way if the client is in
2671  * in Pub/Sub mode. */
pingCommand(client * c)2672 void pingCommand(client *c) {
2673     /* The command takes zero or one arguments. */
2674     if (c->argc > 2) {
2675         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2676             c->cmd->name);
2677         return;
2678     }
2679 
2680     if (c->flags & CLIENT_PUBSUB) {
2681         addReply(c,shared.mbulkhdr[2]);
2682         addReplyBulkCBuffer(c,"pong",4);
2683         if (c->argc == 1)
2684             addReplyBulkCBuffer(c,"",0);
2685         else
2686             addReplyBulk(c,c->argv[1]);
2687     } else {
2688         if (c->argc == 1)
2689             addReply(c,shared.pong);
2690         else
2691             addReplyBulk(c,c->argv[1]);
2692     }
2693 }
2694 
echoCommand(client * c)2695 void echoCommand(client *c) {
2696     addReplyBulk(c,c->argv[1]);
2697 }
2698 
timeCommand(client * c)2699 void timeCommand(client *c) {
2700     struct timeval tv;
2701 
2702     /* gettimeofday() can only fail if &tv is a bad address so we
2703      * don't check for errors. */
2704     gettimeofday(&tv,NULL);
2705     addReplyMultiBulkLen(c,2);
2706     addReplyBulkLongLong(c,tv.tv_sec);
2707     addReplyBulkLongLong(c,tv.tv_usec);
2708 }
2709 
2710 /* Helper function for addReplyCommand() to output flags. */
addReplyCommandFlag(client * c,struct redisCommand * cmd,int f,char * reply)2711 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2712     if (cmd->flags & f) {
2713         addReplyStatus(c, reply);
2714         return 1;
2715     }
2716     return 0;
2717 }
2718 
2719 /* Output the representation of a Redis command. Used by the COMMAND command. */
addReplyCommand(client * c,struct redisCommand * cmd)2720 void addReplyCommand(client *c, struct redisCommand *cmd) {
2721     if (!cmd) {
2722         addReply(c, shared.nullbulk);
2723     } else {
2724         /* We are adding: command name, arg count, flags, first, last, offset */
2725         addReplyMultiBulkLen(c, 6);
2726         addReplyBulkCString(c, cmd->name);
2727         addReplyLongLong(c, cmd->arity);
2728 
2729         int flagcount = 0;
2730         void *flaglen = addDeferredMultiBulkLength(c);
2731         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
2732         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
2733         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
2734         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
2735         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
2736         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
2737         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
2738         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
2739         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
2740         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
2741         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
2742         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
2743         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
2744         if (cmd->getkeys_proc) {
2745             addReplyStatus(c, "movablekeys");
2746             flagcount += 1;
2747         }
2748         setDeferredMultiBulkLength(c, flaglen, flagcount);
2749 
2750         addReplyLongLong(c, cmd->firstkey);
2751         addReplyLongLong(c, cmd->lastkey);
2752         addReplyLongLong(c, cmd->keystep);
2753     }
2754 }
2755 
2756 /* COMMAND <subcommand> <args> */
commandCommand(client * c)2757 void commandCommand(client *c) {
2758     dictIterator *di;
2759     dictEntry *de;
2760 
2761     if (c->argc == 1) {
2762         addReplyMultiBulkLen(c, dictSize(server.commands));
2763         di = dictGetIterator(server.commands);
2764         while ((de = dictNext(di)) != NULL) {
2765             addReplyCommand(c, dictGetVal(de));
2766         }
2767         dictReleaseIterator(di);
2768     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
2769         int i;
2770         addReplyMultiBulkLen(c, c->argc-2);
2771         for (i = 2; i < c->argc; i++) {
2772             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
2773         }
2774     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
2775         addReplyLongLong(c, dictSize(server.commands));
2776     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
2777         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
2778         int *keys, numkeys, j;
2779 
2780         if (!cmd) {
2781             addReplyErrorFormat(c,"Invalid command specified");
2782             return;
2783         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
2784                    ((c->argc-2) < -cmd->arity))
2785         {
2786             addReplyError(c,"Invalid number of arguments specified for command");
2787             return;
2788         }
2789 
2790         keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
2791         addReplyMultiBulkLen(c,numkeys);
2792         for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
2793         getKeysFreeResult(keys);
2794     } else {
2795         addReplyError(c, "Unknown subcommand or wrong number of arguments.");
2796         return;
2797     }
2798 }
2799 
2800 /* Convert an amount of bytes into a human readable string in the form
2801  * of 100B, 2G, 100M, 4K, and so forth. */
bytesToHuman(char * s,unsigned long long n)2802 void bytesToHuman(char *s, unsigned long long n) {
2803     double d;
2804 
2805     if (n < 1024) {
2806         /* Bytes */
2807         sprintf(s,"%lluB",n);
2808         return;
2809     } else if (n < (1024*1024)) {
2810         d = (double)n/(1024);
2811         sprintf(s,"%.2fK",d);
2812     } else if (n < (1024LL*1024*1024)) {
2813         d = (double)n/(1024*1024);
2814         sprintf(s,"%.2fM",d);
2815     } else if (n < (1024LL*1024*1024*1024)) {
2816         d = (double)n/(1024LL*1024*1024);
2817         sprintf(s,"%.2fG",d);
2818     } else if (n < (1024LL*1024*1024*1024*1024)) {
2819         d = (double)n/(1024LL*1024*1024*1024);
2820         sprintf(s,"%.2fT",d);
2821     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
2822         d = (double)n/(1024LL*1024*1024*1024*1024);
2823         sprintf(s,"%.2fP",d);
2824     } else {
2825         /* Let's hope we never need this */
2826         sprintf(s,"%lluB",n);
2827     }
2828 }
2829 
2830 /* Create the string returned by the INFO command. This is decoupled
2831  * by the INFO command itself as we need to report the same information
2832  * on memory corruption problems. */
genRedisInfoString(char * section)2833 sds genRedisInfoString(char *section) {
2834     sds info = sdsempty();
2835     time_t uptime = server.unixtime-server.stat_starttime;
2836     int j, numcommands;
2837     struct rusage self_ru, c_ru;
2838     unsigned long lol, bib;
2839     int allsections = 0, defsections = 0;
2840     int sections = 0;
2841 
2842     if (section == NULL) section = "default";
2843     allsections = strcasecmp(section,"all") == 0;
2844     defsections = strcasecmp(section,"default") == 0;
2845 
2846     getrusage(RUSAGE_SELF, &self_ru);
2847     getrusage(RUSAGE_CHILDREN, &c_ru);
2848     getClientsMaxBuffers(&lol,&bib);
2849 
2850     /* Server */
2851     if (allsections || defsections || !strcasecmp(section,"server")) {
2852         static int call_uname = 1;
2853         static struct utsname name;
2854         char *mode;
2855 
2856         if (server.cluster_enabled) mode = "cluster";
2857         else if (server.sentinel_mode) mode = "sentinel";
2858         else mode = "standalone";
2859 
2860         if (sections++) info = sdscat(info,"\r\n");
2861 
2862         if (call_uname) {
2863             /* Uname can be slow and is always the same output. Cache it. */
2864             uname(&name);
2865             call_uname = 0;
2866         }
2867 
2868         info = sdscatprintf(info,
2869             "# Server\r\n"
2870             "redis_version:%s\r\n"
2871             "redis_git_sha1:%s\r\n"
2872             "redis_git_dirty:%d\r\n"
2873             "redis_build_id:%llx\r\n"
2874             "redis_mode:%s\r\n"
2875             "os:%s %s %s\r\n"
2876             "arch_bits:%d\r\n"
2877             "multiplexing_api:%s\r\n"
2878             "gcc_version:%d.%d.%d\r\n"
2879             "process_id:%ld\r\n"
2880             "run_id:%s\r\n"
2881             "tcp_port:%d\r\n"
2882             "uptime_in_seconds:%jd\r\n"
2883             "uptime_in_days:%jd\r\n"
2884             "hz:%d\r\n"
2885             "lru_clock:%ld\r\n"
2886             "executable:%s\r\n"
2887             "config_file:%s\r\n",
2888             REDIS_VERSION,
2889             redisGitSHA1(),
2890             strtol(redisGitDirty(),NULL,10) > 0,
2891             (unsigned long long) redisBuildId(),
2892             mode,
2893             name.sysname, name.release, name.machine,
2894             server.arch_bits,
2895             aeGetApiName(),
2896 #ifdef __GNUC__
2897             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
2898 #else
2899             0,0,0,
2900 #endif
2901             (long) getpid(),
2902             server.runid,
2903             server.port,
2904             (intmax_t)uptime,
2905             (intmax_t)(uptime/(3600*24)),
2906             server.hz,
2907             (unsigned long) server.lruclock,
2908             server.executable ? server.executable : "",
2909             server.configfile ? server.configfile : "");
2910     }
2911 
2912     /* Clients */
2913     if (allsections || defsections || !strcasecmp(section,"clients")) {
2914         if (sections++) info = sdscat(info,"\r\n");
2915         info = sdscatprintf(info,
2916             "# Clients\r\n"
2917             "connected_clients:%lu\r\n"
2918             "client_longest_output_list:%lu\r\n"
2919             "client_biggest_input_buf:%lu\r\n"
2920             "blocked_clients:%d\r\n",
2921             listLength(server.clients)-listLength(server.slaves),
2922             lol, bib,
2923             server.bpop_blocked_clients);
2924     }
2925 
2926     /* Memory */
2927     if (allsections || defsections || !strcasecmp(section,"memory")) {
2928         char hmem[64];
2929         char peak_hmem[64];
2930         char total_system_hmem[64];
2931         char used_memory_lua_hmem[64];
2932         char used_memory_rss_hmem[64];
2933         char maxmemory_hmem[64];
2934         size_t zmalloc_used = zmalloc_used_memory();
2935         size_t total_system_mem = server.system_memory_size;
2936         const char *evict_policy = evictPolicyToString();
2937         long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
2938 
2939         /* Peak memory is updated from time to time by serverCron() so it
2940          * may happen that the instantaneous value is slightly bigger than
2941          * the peak value. This may confuse users, so we update the peak
2942          * if found smaller than the current memory usage. */
2943         if (zmalloc_used > server.stat_peak_memory)
2944             server.stat_peak_memory = zmalloc_used;
2945 
2946         bytesToHuman(hmem,zmalloc_used);
2947         bytesToHuman(peak_hmem,server.stat_peak_memory);
2948         bytesToHuman(total_system_hmem,total_system_mem);
2949         bytesToHuman(used_memory_lua_hmem,memory_lua);
2950         bytesToHuman(used_memory_rss_hmem,server.resident_set_size);
2951         bytesToHuman(maxmemory_hmem,server.maxmemory);
2952 
2953         if (sections++) info = sdscat(info,"\r\n");
2954         info = sdscatprintf(info,
2955             "# Memory\r\n"
2956             "used_memory:%zu\r\n"
2957             "used_memory_human:%s\r\n"
2958             "used_memory_rss:%zu\r\n"
2959             "used_memory_rss_human:%s\r\n"
2960             "used_memory_peak:%zu\r\n"
2961             "used_memory_peak_human:%s\r\n"
2962             "total_system_memory:%lu\r\n"
2963             "total_system_memory_human:%s\r\n"
2964             "used_memory_lua:%lld\r\n"
2965             "used_memory_lua_human:%s\r\n"
2966             "maxmemory:%lld\r\n"
2967             "maxmemory_human:%s\r\n"
2968             "maxmemory_policy:%s\r\n"
2969             "mem_fragmentation_ratio:%.2f\r\n"
2970             "mem_allocator:%s\r\n",
2971             zmalloc_used,
2972             hmem,
2973             server.resident_set_size,
2974             used_memory_rss_hmem,
2975             server.stat_peak_memory,
2976             peak_hmem,
2977             (unsigned long)total_system_mem,
2978             total_system_hmem,
2979             memory_lua,
2980             used_memory_lua_hmem,
2981             server.maxmemory,
2982             maxmemory_hmem,
2983             evict_policy,
2984             zmalloc_get_fragmentation_ratio(server.resident_set_size),
2985             ZMALLOC_LIB
2986             );
2987     }
2988 
2989     /* Persistence */
2990     if (allsections || defsections || !strcasecmp(section,"persistence")) {
2991         if (sections++) info = sdscat(info,"\r\n");
2992         info = sdscatprintf(info,
2993             "# Persistence\r\n"
2994             "loading:%d\r\n"
2995             "rdb_changes_since_last_save:%lld\r\n"
2996             "rdb_bgsave_in_progress:%d\r\n"
2997             "rdb_last_save_time:%jd\r\n"
2998             "rdb_last_bgsave_status:%s\r\n"
2999             "rdb_last_bgsave_time_sec:%jd\r\n"
3000             "rdb_current_bgsave_time_sec:%jd\r\n"
3001             "aof_enabled:%d\r\n"
3002             "aof_rewrite_in_progress:%d\r\n"
3003             "aof_rewrite_scheduled:%d\r\n"
3004             "aof_last_rewrite_time_sec:%jd\r\n"
3005             "aof_current_rewrite_time_sec:%jd\r\n"
3006             "aof_last_bgrewrite_status:%s\r\n"
3007             "aof_last_write_status:%s\r\n",
3008             server.loading,
3009             server.dirty,
3010             server.rdb_child_pid != -1,
3011             (intmax_t)server.lastsave,
3012             (server.lastbgsave_status == C_OK) ? "ok" : "err",
3013             (intmax_t)server.rdb_save_time_last,
3014             (intmax_t)((server.rdb_child_pid == -1) ?
3015                 -1 : time(NULL)-server.rdb_save_time_start),
3016             server.aof_state != AOF_OFF,
3017             server.aof_child_pid != -1,
3018             server.aof_rewrite_scheduled,
3019             (intmax_t)server.aof_rewrite_time_last,
3020             (intmax_t)((server.aof_child_pid == -1) ?
3021                 -1 : time(NULL)-server.aof_rewrite_time_start),
3022             (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
3023             (server.aof_last_write_status == C_OK) ? "ok" : "err");
3024 
3025         if (server.aof_state != AOF_OFF) {
3026             info = sdscatprintf(info,
3027                 "aof_current_size:%lld\r\n"
3028                 "aof_base_size:%lld\r\n"
3029                 "aof_pending_rewrite:%d\r\n"
3030                 "aof_buffer_length:%zu\r\n"
3031                 "aof_rewrite_buffer_length:%lu\r\n"
3032                 "aof_pending_bio_fsync:%llu\r\n"
3033                 "aof_delayed_fsync:%lu\r\n",
3034                 (long long) server.aof_current_size,
3035                 (long long) server.aof_rewrite_base_size,
3036                 server.aof_rewrite_scheduled,
3037                 sdslen(server.aof_buf),
3038                 aofRewriteBufferSize(),
3039                 bioPendingJobsOfType(BIO_AOF_FSYNC),
3040                 server.aof_delayed_fsync);
3041         }
3042 
3043         if (server.loading) {
3044             double perc;
3045             time_t eta, elapsed;
3046             off_t remaining_bytes = server.loading_total_bytes-
3047                                     server.loading_loaded_bytes;
3048 
3049             perc = ((double)server.loading_loaded_bytes /
3050                    (server.loading_total_bytes+1)) * 100;
3051 
3052             elapsed = time(NULL)-server.loading_start_time;
3053             if (elapsed == 0) {
3054                 eta = 1; /* A fake 1 second figure if we don't have
3055                             enough info */
3056             } else {
3057                 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3058             }
3059 
3060             info = sdscatprintf(info,
3061                 "loading_start_time:%jd\r\n"
3062                 "loading_total_bytes:%llu\r\n"
3063                 "loading_loaded_bytes:%llu\r\n"
3064                 "loading_loaded_perc:%.2f\r\n"
3065                 "loading_eta_seconds:%jd\r\n",
3066                 (intmax_t) server.loading_start_time,
3067                 (unsigned long long) server.loading_total_bytes,
3068                 (unsigned long long) server.loading_loaded_bytes,
3069                 perc,
3070                 (intmax_t)eta
3071             );
3072         }
3073     }
3074 
3075     /* Stats */
3076     if (allsections || defsections || !strcasecmp(section,"stats")) {
3077         if (sections++) info = sdscat(info,"\r\n");
3078         info = sdscatprintf(info,
3079             "# Stats\r\n"
3080             "total_connections_received:%lld\r\n"
3081             "total_commands_processed:%lld\r\n"
3082             "instantaneous_ops_per_sec:%lld\r\n"
3083             "total_net_input_bytes:%lld\r\n"
3084             "total_net_output_bytes:%lld\r\n"
3085             "instantaneous_input_kbps:%.2f\r\n"
3086             "instantaneous_output_kbps:%.2f\r\n"
3087             "rejected_connections:%lld\r\n"
3088             "sync_full:%lld\r\n"
3089             "sync_partial_ok:%lld\r\n"
3090             "sync_partial_err:%lld\r\n"
3091             "expired_keys:%lld\r\n"
3092             "evicted_keys:%lld\r\n"
3093             "keyspace_hits:%lld\r\n"
3094             "keyspace_misses:%lld\r\n"
3095             "pubsub_channels:%ld\r\n"
3096             "pubsub_patterns:%lu\r\n"
3097             "latest_fork_usec:%lld\r\n"
3098             "migrate_cached_sockets:%ld\r\n",
3099             server.stat_numconnections,
3100             server.stat_numcommands,
3101             getInstantaneousMetric(STATS_METRIC_COMMAND),
3102             server.stat_net_input_bytes,
3103             server.stat_net_output_bytes,
3104             (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3105             (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3106             server.stat_rejected_conn,
3107             server.stat_sync_full,
3108             server.stat_sync_partial_ok,
3109             server.stat_sync_partial_err,
3110             server.stat_expiredkeys,
3111             server.stat_evictedkeys,
3112             server.stat_keyspace_hits,
3113             server.stat_keyspace_misses,
3114             dictSize(server.pubsub_channels),
3115             listLength(server.pubsub_patterns),
3116             server.stat_fork_time,
3117             dictSize(server.migrate_cached_sockets));
3118     }
3119 
3120     /* Replication */
3121     if (allsections || defsections || !strcasecmp(section,"replication")) {
3122         if (sections++) info = sdscat(info,"\r\n");
3123         info = sdscatprintf(info,
3124             "# Replication\r\n"
3125             "role:%s\r\n",
3126             server.masterhost == NULL ? "master" : "slave");
3127         if (server.masterhost) {
3128             long long slave_repl_offset = 1;
3129 
3130             if (server.master)
3131                 slave_repl_offset = server.master->reploff;
3132             else if (server.cached_master)
3133                 slave_repl_offset = server.cached_master->reploff;
3134 
3135             info = sdscatprintf(info,
3136                 "master_host:%s\r\n"
3137                 "master_port:%d\r\n"
3138                 "master_link_status:%s\r\n"
3139                 "master_last_io_seconds_ago:%d\r\n"
3140                 "master_sync_in_progress:%d\r\n"
3141                 "slave_repl_offset:%lld\r\n"
3142                 ,server.masterhost,
3143                 server.masterport,
3144                 (server.repl_state == REPL_STATE_CONNECTED) ?
3145                     "up" : "down",
3146                 server.master ?
3147                 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3148                 server.repl_state == REPL_STATE_TRANSFER,
3149                 slave_repl_offset
3150             );
3151 
3152             if (server.repl_state == REPL_STATE_TRANSFER) {
3153                 info = sdscatprintf(info,
3154                     "master_sync_left_bytes:%lld\r\n"
3155                     "master_sync_last_io_seconds_ago:%d\r\n"
3156                     , (long long)
3157                         (server.repl_transfer_size - server.repl_transfer_read),
3158                     (int)(server.unixtime-server.repl_transfer_lastio)
3159                 );
3160             }
3161 
3162             if (server.repl_state != REPL_STATE_CONNECTED) {
3163                 info = sdscatprintf(info,
3164                     "master_link_down_since_seconds:%jd\r\n",
3165                     (intmax_t)server.unixtime-server.repl_down_since);
3166             }
3167             info = sdscatprintf(info,
3168                 "slave_priority:%d\r\n"
3169                 "slave_read_only:%d\r\n",
3170                 server.slave_priority,
3171                 server.repl_slave_ro);
3172         }
3173 
3174         info = sdscatprintf(info,
3175             "connected_slaves:%lu\r\n",
3176             listLength(server.slaves));
3177 
3178         /* If min-slaves-to-write is active, write the number of slaves
3179          * currently considered 'good'. */
3180         if (server.repl_min_slaves_to_write &&
3181             server.repl_min_slaves_max_lag) {
3182             info = sdscatprintf(info,
3183                 "min_slaves_good_slaves:%d\r\n",
3184                 server.repl_good_slaves_count);
3185         }
3186 
3187         if (listLength(server.slaves)) {
3188             int slaveid = 0;
3189             listNode *ln;
3190             listIter li;
3191 
3192             listRewind(server.slaves,&li);
3193             while((ln = listNext(&li))) {
3194                 client *slave = listNodeValue(ln);
3195                 char *state = NULL;
3196                 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
3197                 int port;
3198                 long lag = 0;
3199 
3200                 if (slaveip[0] == '\0') {
3201                     if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1)
3202                         continue;
3203                     slaveip = ip;
3204                 }
3205                 switch(slave->replstate) {
3206                 case SLAVE_STATE_WAIT_BGSAVE_START:
3207                 case SLAVE_STATE_WAIT_BGSAVE_END:
3208                     state = "wait_bgsave";
3209                     break;
3210                 case SLAVE_STATE_SEND_BULK:
3211                     state = "send_bulk";
3212                     break;
3213                 case SLAVE_STATE_ONLINE:
3214                     state = "online";
3215                     break;
3216                 }
3217                 if (state == NULL) continue;
3218                 if (slave->replstate == SLAVE_STATE_ONLINE)
3219                     lag = time(NULL) - slave->repl_ack_time;
3220 
3221                 info = sdscatprintf(info,
3222                     "slave%d:ip=%s,port=%d,state=%s,"
3223                     "offset=%lld,lag=%ld\r\n",
3224                     slaveid,slaveip,slave->slave_listening_port,state,
3225                     slave->repl_ack_off, lag);
3226                 slaveid++;
3227             }
3228         }
3229         info = sdscatprintf(info,
3230             "master_repl_offset:%lld\r\n"
3231             "repl_backlog_active:%d\r\n"
3232             "repl_backlog_size:%lld\r\n"
3233             "repl_backlog_first_byte_offset:%lld\r\n"
3234             "repl_backlog_histlen:%lld\r\n",
3235             server.master_repl_offset,
3236             server.repl_backlog != NULL,
3237             server.repl_backlog_size,
3238             server.repl_backlog_off,
3239             server.repl_backlog_histlen);
3240     }
3241 
3242     /* CPU */
3243     if (allsections || defsections || !strcasecmp(section,"cpu")) {
3244         if (sections++) info = sdscat(info,"\r\n");
3245         info = sdscatprintf(info,
3246         "# CPU\r\n"
3247         "used_cpu_sys:%.2f\r\n"
3248         "used_cpu_user:%.2f\r\n"
3249         "used_cpu_sys_children:%.2f\r\n"
3250         "used_cpu_user_children:%.2f\r\n",
3251         (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
3252         (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
3253         (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
3254         (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000);
3255     }
3256 
3257     /* cmdtime */
3258     if (allsections || !strcasecmp(section,"commandstats")) {
3259         if (sections++) info = sdscat(info,"\r\n");
3260         info = sdscatprintf(info, "# Commandstats\r\n");
3261         numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
3262         for (j = 0; j < numcommands; j++) {
3263             struct redisCommand *c = redisCommandTable+j;
3264 
3265             if (!c->calls) continue;
3266             info = sdscatprintf(info,
3267                 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3268                 c->name, c->calls, c->microseconds,
3269                 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3270         }
3271     }
3272 
3273     /* Cluster */
3274     if (allsections || defsections || !strcasecmp(section,"cluster")) {
3275         if (sections++) info = sdscat(info,"\r\n");
3276         info = sdscatprintf(info,
3277         "# Cluster\r\n"
3278         "cluster_enabled:%d\r\n",
3279         server.cluster_enabled);
3280     }
3281 
3282     /* Key space */
3283     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3284         if (sections++) info = sdscat(info,"\r\n");
3285         info = sdscatprintf(info, "# Keyspace\r\n");
3286         for (j = 0; j < server.dbnum; j++) {
3287             long long keys, vkeys;
3288 
3289             keys = dictSize(server.db[j].dict);
3290             vkeys = dictSize(server.db[j].expires);
3291             if (keys || vkeys) {
3292                 info = sdscatprintf(info,
3293                     "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3294                     j, keys, vkeys, server.db[j].avg_ttl);
3295             }
3296         }
3297     }
3298     return info;
3299 }
3300 
infoCommand(client * c)3301 void infoCommand(client *c) {
3302     char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3303 
3304     if (c->argc > 2) {
3305         addReply(c,shared.syntaxerr);
3306         return;
3307     }
3308     addReplyBulkSds(c, genRedisInfoString(section));
3309 }
3310 
monitorCommand(client * c)3311 void monitorCommand(client *c) {
3312     /* ignore MONITOR if already slave or in monitor mode */
3313     if (c->flags & CLIENT_SLAVE) return;
3314 
3315     c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3316     listAddNodeTail(server.monitors,c);
3317     addReply(c,shared.ok);
3318 }
3319 
3320 /* ============================ Maxmemory directive  ======================== */
3321 
3322 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config
3323  * file to limit the max memory used by the server, before processing a
3324  * command.
3325  *
3326  * The goal of the function is to free enough memory to keep Redis under the
3327  * configured memory limit.
3328  *
3329  * The function starts calculating how many bytes should be freed to keep
3330  * Redis under the limit, and enters a loop selecting the best keys to
3331  * evict accordingly to the configured policy.
3332  *
3333  * If all the bytes needed to return back under the limit were freed the
3334  * function returns C_OK, otherwise C_ERR is returned, and the caller
3335  * should block the execution of commands that will result in more memory
3336  * used by the server.
3337  *
3338  * ------------------------------------------------------------------------
3339  *
3340  * LRU approximation algorithm
3341  *
3342  * Redis uses an approximation of the LRU algorithm that runs in constant
3343  * memory. Every time there is a key to expire, we sample N keys (with
3344  * N very small, usually in around 5) to populate a pool of best keys to
3345  * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE).
3346  *
3347  * The N keys sampled are added in the pool of good keys to expire (the one
3348  * with an old access time) if they are better than one of the current keys
3349  * in the pool.
3350  *
3351  * After the pool is populated, the best key we have in the pool is expired.
3352  * However note that we don't remove keys from the pool when they are deleted
3353  * so the pool may contain keys that no longer exist.
3354  *
3355  * When we try to evict a key, and all the entries in the pool don't exist
3356  * we populate it again. This time we'll be sure that the pool has at least
3357  * one key that can be evicted, if there is at least one key that can be
3358  * evicted in the whole database. */
3359 
3360 /* Create a new eviction pool. */
evictionPoolAlloc(void)3361 struct evictionPoolEntry *evictionPoolAlloc(void) {
3362     struct evictionPoolEntry *ep;
3363     int j;
3364 
3365     ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE);
3366     for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) {
3367         ep[j].idle = 0;
3368         ep[j].key = NULL;
3369     }
3370     return ep;
3371 }
3372 
3373 /* This is an helper function for freeMemoryIfNeeded(), it is used in order
3374  * to populate the evictionPool with a few entries every time we want to
3375  * expire a key. Keys with idle time smaller than one of the current
3376  * keys are added. Keys are always added if there are free entries.
3377  *
3378  * We insert keys on place in ascending order, so keys with the smaller
3379  * idle time are on the left, and keys with the higher idle time on the
3380  * right. */
3381 
3382 #define EVICTION_SAMPLES_ARRAY_SIZE 16
evictionPoolPopulate(dict * sampledict,dict * keydict,struct evictionPoolEntry * pool)3383 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
3384     int j, k, count;
3385     dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
3386     dictEntry **samples;
3387 
3388     /* Try to use a static buffer: this function is a big hit...
3389      * Note: it was actually measured that this helps. */
3390     if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
3391         samples = _samples;
3392     } else {
3393         samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);
3394     }
3395 
3396     count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
3397     for (j = 0; j < count; j++) {
3398         unsigned long long idle;
3399         sds key;
3400         robj *o;
3401         dictEntry *de;
3402 
3403         de = samples[j];
3404         key = dictGetKey(de);
3405         /* If the dictionary we are sampling from is not the main
3406          * dictionary (but the expires one) we need to lookup the key
3407          * again in the key dictionary to obtain the value object. */
3408         if (sampledict != keydict) de = dictFind(keydict, key);
3409         o = dictGetVal(de);
3410         idle = estimateObjectIdleTime(o);
3411 
3412         /* Insert the element inside the pool.
3413          * First, find the first empty bucket or the first populated
3414          * bucket that has an idle time smaller than our idle time. */
3415         k = 0;
3416         while (k < MAXMEMORY_EVICTION_POOL_SIZE &&
3417                pool[k].key &&
3418                pool[k].idle < idle) k++;
3419         if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) {
3420             /* Can't insert if the element is < the worst element we have
3421              * and there are no empty buckets. */
3422             continue;
3423         } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) {
3424             /* Inserting into empty position. No setup needed before insert. */
3425         } else {
3426             /* Inserting in the middle. Now k points to the first element
3427              * greater than the element to insert.  */
3428             if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) {
3429                 /* Free space on the right? Insert at k shifting
3430                  * all the elements from k to end to the right. */
3431                 memmove(pool+k+1,pool+k,
3432                     sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3433             } else {
3434                 /* No free space on right? Insert at k-1 */
3435                 k--;
3436                 /* Shift all elements on the left of k (included) to the
3437                  * left, so we discard the element with smaller idle time. */
3438                 sdsfree(pool[0].key);
3439                 memmove(pool,pool+1,sizeof(pool[0])*k);
3440             }
3441         }
3442         pool[k].key = sdsdup(key);
3443         pool[k].idle = idle;
3444     }
3445     if (samples != _samples) zfree(samples);
3446 }
3447 
freeMemoryIfNeeded(void)3448 int freeMemoryIfNeeded(void) {
3449     size_t mem_used, mem_tofree, mem_freed;
3450     int slaves = listLength(server.slaves);
3451     mstime_t latency, eviction_latency;
3452 
3453     /* Remove the size of slaves output buffers and AOF buffer from the
3454      * count of used memory. */
3455     mem_used = zmalloc_used_memory();
3456     if (slaves) {
3457         listIter li;
3458         listNode *ln;
3459 
3460         listRewind(server.slaves,&li);
3461         while((ln = listNext(&li))) {
3462             client *slave = listNodeValue(ln);
3463             unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
3464             if (obuf_bytes > mem_used)
3465                 mem_used = 0;
3466             else
3467                 mem_used -= obuf_bytes;
3468         }
3469     }
3470     if (server.aof_state != AOF_OFF) {
3471         mem_used -= sdslen(server.aof_buf);
3472         mem_used -= aofRewriteBufferSize();
3473     }
3474 
3475     /* Check if we are over the memory limit. */
3476     if (mem_used <= server.maxmemory) return C_OK;
3477 
3478     if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
3479         return C_ERR; /* We need to free memory, but policy forbids. */
3480 
3481     /* Compute how much memory we need to free. */
3482     mem_tofree = mem_used - server.maxmemory;
3483     mem_freed = 0;
3484     latencyStartMonitor(latency);
3485     while (mem_freed < mem_tofree) {
3486         int j, k, keys_freed = 0;
3487 
3488         for (j = 0; j < server.dbnum; j++) {
3489             long bestval = 0; /* just to prevent warning */
3490             sds bestkey = NULL;
3491             dictEntry *de;
3492             redisDb *db = server.db+j;
3493             dict *dict;
3494 
3495             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3496                 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
3497             {
3498                 dict = server.db[j].dict;
3499             } else {
3500                 dict = server.db[j].expires;
3501             }
3502             if (dictSize(dict) == 0) continue;
3503 
3504             /* volatile-random and allkeys-random policy */
3505             if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
3506                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
3507             {
3508                 de = dictGetRandomKey(dict);
3509                 bestkey = dictGetKey(de);
3510             }
3511 
3512             /* volatile-lru and allkeys-lru policy */
3513             else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3514                 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
3515             {
3516                 struct evictionPoolEntry *pool = db->eviction_pool;
3517 
3518                 while(bestkey == NULL) {
3519                     evictionPoolPopulate(dict, db->dict, db->eviction_pool);
3520                     /* Go backward from best to worst element to evict. */
3521                     for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
3522                         if (pool[k].key == NULL) continue;
3523                         de = dictFind(dict,pool[k].key);
3524 
3525                         /* Remove the entry from the pool. */
3526                         sdsfree(pool[k].key);
3527                         /* Shift all elements on its right to left. */
3528                         memmove(pool+k,pool+k+1,
3529                             sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3530                         /* Clear the element on the right which is empty
3531                          * since we shifted one position to the left.  */
3532                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
3533                         pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
3534 
3535                         /* If the key exists, is our pick. Otherwise it is
3536                          * a ghost and we need to try the next element. */
3537                         if (de) {
3538                             bestkey = dictGetKey(de);
3539                             break;
3540                         } else {
3541                             /* Ghost... */
3542                             continue;
3543                         }
3544                     }
3545                 }
3546             }
3547 
3548             /* volatile-ttl */
3549             else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
3550                 for (k = 0; k < server.maxmemory_samples; k++) {
3551                     sds thiskey;
3552                     long thisval;
3553 
3554                     de = dictGetRandomKey(dict);
3555                     thiskey = dictGetKey(de);
3556                     thisval = (long) dictGetVal(de);
3557 
3558                     /* Expire sooner (minor expire unix timestamp) is better
3559                      * candidate for deletion */
3560                     if (bestkey == NULL || thisval < bestval) {
3561                         bestkey = thiskey;
3562                         bestval = thisval;
3563                     }
3564                 }
3565             }
3566 
3567             /* Finally remove the selected key. */
3568             if (bestkey) {
3569                 long long delta;
3570 
3571                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
3572                 propagateExpire(db,keyobj);
3573                 /* We compute the amount of memory freed by dbDelete() alone.
3574                  * It is possible that actually the memory needed to propagate
3575                  * the DEL in AOF and replication link is greater than the one
3576                  * we are freeing removing the key, but we can't account for
3577                  * that otherwise we would never exit the loop.
3578                  *
3579                  * AOF and Output buffer memory will be freed eventually so
3580                  * we only care about memory used by the key space. */
3581                 delta = (long long) zmalloc_used_memory();
3582                 latencyStartMonitor(eviction_latency);
3583                 dbDelete(db,keyobj);
3584                 latencyEndMonitor(eviction_latency);
3585                 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
3586                 latencyRemoveNestedEvent(latency,eviction_latency);
3587                 delta -= (long long) zmalloc_used_memory();
3588                 mem_freed += delta;
3589                 server.stat_evictedkeys++;
3590                 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
3591                     keyobj, db->id);
3592                 decrRefCount(keyobj);
3593                 keys_freed++;
3594 
3595                 /* When the memory to free starts to be big enough, we may
3596                  * start spending so much time here that is impossible to
3597                  * deliver data to the slaves fast enough, so we force the
3598                  * transmission here inside the loop. */
3599                 if (slaves) flushSlavesOutputBuffers();
3600             }
3601         }
3602         if (!keys_freed) {
3603             latencyEndMonitor(latency);
3604             latencyAddSampleIfNeeded("eviction-cycle",latency);
3605             return C_ERR; /* nothing to free... */
3606         }
3607     }
3608     latencyEndMonitor(latency);
3609     latencyAddSampleIfNeeded("eviction-cycle",latency);
3610     return C_OK;
3611 }
3612 
3613 /* =================================== Main! ================================ */
3614 
3615 #ifdef __linux__
linuxOvercommitMemoryValue(void)3616 int linuxOvercommitMemoryValue(void) {
3617     FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3618     char buf[64];
3619 
3620     if (!fp) return -1;
3621     if (fgets(buf,64,fp) == NULL) {
3622         fclose(fp);
3623         return -1;
3624     }
3625     fclose(fp);
3626 
3627     return atoi(buf);
3628 }
3629 
linuxMemoryWarnings(void)3630 void linuxMemoryWarnings(void) {
3631     if (linuxOvercommitMemoryValue() == 0) {
3632         serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3633     }
3634     if (THPIsEnabled()) {
3635         serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3636     }
3637 }
3638 #endif /* __linux__ */
3639 
createPidFile(void)3640 void createPidFile(void) {
3641     /* If pidfile requested, but no pidfile defined, use
3642      * default pidfile path */
3643     if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3644 
3645     /* Try to write the pid file in a best-effort way. */
3646     FILE *fp = fopen(server.pidfile,"w");
3647     if (fp) {
3648         fprintf(fp,"%d\n",(int)getpid());
3649         fclose(fp);
3650     }
3651 }
3652 
daemonize(void)3653 void daemonize(void) {
3654     int fd;
3655 
3656     if (fork() != 0) exit(0); /* parent exits */
3657     setsid(); /* create a new session */
3658 
3659     /* Every output goes to /dev/null. If Redis is daemonized but
3660      * the 'logfile' is set to 'stdout' in the configuration file
3661      * it will not log at all. */
3662     if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3663         dup2(fd, STDIN_FILENO);
3664         dup2(fd, STDOUT_FILENO);
3665         dup2(fd, STDERR_FILENO);
3666         if (fd > STDERR_FILENO) close(fd);
3667     }
3668 }
3669 
version(void)3670 void version(void) {
3671     printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3672         REDIS_VERSION,
3673         redisGitSHA1(),
3674         atoi(redisGitDirty()) > 0,
3675         ZMALLOC_LIB,
3676         sizeof(long) == 4 ? 32 : 64,
3677         (unsigned long long) redisBuildId());
3678     exit(0);
3679 }
3680 
usage(void)3681 void usage(void) {
3682     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3683     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
3684     fprintf(stderr,"       ./redis-server -v or --version\n");
3685     fprintf(stderr,"       ./redis-server -h or --help\n");
3686     fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
3687     fprintf(stderr,"Examples:\n");
3688     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
3689     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
3690     fprintf(stderr,"       ./redis-server --port 7777\n");
3691     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
3692     fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3693     fprintf(stderr,"Sentinel mode:\n");
3694     fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
3695     exit(1);
3696 }
3697 
redisAsciiArt(void)3698 void redisAsciiArt(void) {
3699 #include "asciilogo.h"
3700     char *buf = zmalloc(1024*16);
3701     char *mode;
3702 
3703     if (server.cluster_enabled) mode = "cluster";
3704     else if (server.sentinel_mode) mode = "sentinel";
3705     else mode = "standalone";
3706 
3707     if (server.syslog_enabled) {
3708         serverLog(LL_NOTICE,
3709             "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
3710             REDIS_VERSION,
3711             redisGitSHA1(),
3712             strtol(redisGitDirty(),NULL,10) > 0,
3713             (sizeof(long) == 8) ? "64" : "32",
3714             mode, server.port,
3715             (long) getpid()
3716         );
3717     } else {
3718         snprintf(buf,1024*16,ascii_logo,
3719             REDIS_VERSION,
3720             redisGitSHA1(),
3721             strtol(redisGitDirty(),NULL,10) > 0,
3722             (sizeof(long) == 8) ? "64" : "32",
3723             mode, server.port,
3724             (long) getpid()
3725         );
3726         serverLogRaw(LL_NOTICE|LL_RAW,buf);
3727     }
3728     zfree(buf);
3729 }
3730 
sigShutdownHandler(int sig)3731 static void sigShutdownHandler(int sig) {
3732     char *msg;
3733 
3734     switch (sig) {
3735     case SIGINT:
3736         msg = "Received SIGINT scheduling shutdown...";
3737         break;
3738     case SIGTERM:
3739         msg = "Received SIGTERM scheduling shutdown...";
3740         break;
3741     default:
3742         msg = "Received shutdown signal, scheduling shutdown...";
3743     };
3744 
3745     /* SIGINT is often delivered via Ctrl+C in an interactive session.
3746      * If we receive the signal the second time, we interpret this as
3747      * the user really wanting to quit ASAP without waiting to persist
3748      * on disk. */
3749     if (server.shutdown_asap && sig == SIGINT) {
3750         serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3751         rdbRemoveTempFile(getpid());
3752         exit(1); /* Exit with an error since this was not a clean shutdown. */
3753     } else if (server.loading) {
3754         exit(0);
3755     }
3756 
3757     serverLogFromHandler(LL_WARNING, msg);
3758     server.shutdown_asap = 1;
3759 }
3760 
setupSignalHandlers(void)3761 void setupSignalHandlers(void) {
3762     struct sigaction act;
3763 
3764     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3765      * Otherwise, sa_handler is used. */
3766     sigemptyset(&act.sa_mask);
3767     act.sa_flags = 0;
3768     act.sa_handler = sigShutdownHandler;
3769     sigaction(SIGTERM, &act, NULL);
3770     sigaction(SIGINT, &act, NULL);
3771 
3772 #ifdef HAVE_BACKTRACE
3773     sigemptyset(&act.sa_mask);
3774     act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3775     act.sa_sigaction = sigsegvHandler;
3776     sigaction(SIGSEGV, &act, NULL);
3777     sigaction(SIGBUS, &act, NULL);
3778     sigaction(SIGFPE, &act, NULL);
3779     sigaction(SIGILL, &act, NULL);
3780 #endif
3781     return;
3782 }
3783 
3784 void memtest(size_t megabytes, int passes);
3785 
3786 /* Returns 1 if there is --sentinel among the arguments or if
3787  * argv[0] is exactly "redis-sentinel". */
checkForSentinelMode(int argc,char ** argv)3788 int checkForSentinelMode(int argc, char **argv) {
3789     int j;
3790 
3791     if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3792     for (j = 1; j < argc; j++)
3793         if (!strcmp(argv[j],"--sentinel")) return 1;
3794     return 0;
3795 }
3796 
3797 /* Function called at startup to load RDB or AOF file in memory. */
loadDataFromDisk(void)3798 void loadDataFromDisk(void) {
3799     long long start = ustime();
3800     if (server.aof_state == AOF_ON) {
3801         if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3802             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3803     } else {
3804         if (rdbLoad(server.rdb_filename) == C_OK) {
3805             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3806                 (float)(ustime()-start)/1000000);
3807         } else if (errno != ENOENT) {
3808             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3809             exit(1);
3810         }
3811     }
3812 }
3813 
redisOutOfMemoryHandler(size_t allocation_size)3814 void redisOutOfMemoryHandler(size_t allocation_size) {
3815     serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3816         allocation_size);
3817     serverPanic("Redis aborting for OUT OF MEMORY");
3818 }
3819 
redisSetProcTitle(char * title)3820 void redisSetProcTitle(char *title) {
3821 #ifdef USE_SETPROCTITLE
3822     char *server_mode = "";
3823     if (server.cluster_enabled) server_mode = " [cluster]";
3824     else if (server.sentinel_mode) server_mode = " [sentinel]";
3825 
3826     setproctitle("%s %s:%d%s",
3827         title,
3828         server.bindaddr_count ? server.bindaddr[0] : "*",
3829         server.port,
3830         server_mode);
3831 #else
3832     UNUSED(title);
3833 #endif
3834 }
3835 
3836 /*
3837  * Check whether systemd or upstart have been used to start redis.
3838  */
3839 
redisSupervisedUpstart(void)3840 int redisSupervisedUpstart(void) {
3841     const char *upstart_job = getenv("UPSTART_JOB");
3842 
3843     if (!upstart_job) {
3844         serverLog(LL_WARNING,
3845                 "upstart supervision requested, but UPSTART_JOB not found");
3846         return 0;
3847     }
3848 
3849     serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness");
3850     raise(SIGSTOP);
3851     unsetenv("UPSTART_JOB");
3852     return 1;
3853 }
3854 
redisSupervisedSystemd(void)3855 int redisSupervisedSystemd(void) {
3856     const char *notify_socket = getenv("NOTIFY_SOCKET");
3857     int fd = 1;
3858     struct sockaddr_un su;
3859     struct iovec iov;
3860     struct msghdr hdr;
3861     int sendto_flags = 0;
3862 
3863     if (!notify_socket) {
3864         serverLog(LL_WARNING,
3865                 "systemd supervision requested, but NOTIFY_SOCKET not found");
3866         return 0;
3867     }
3868 
3869     if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3870         return 0;
3871     }
3872 
3873     serverLog(LL_NOTICE, "supervised by systemd, will signal readiness");
3874     if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3875         serverLog(LL_WARNING,
3876                 "Can't connect to systemd socket %s", notify_socket);
3877         return 0;
3878     }
3879 
3880     memset(&su, 0, sizeof(su));
3881     su.sun_family = AF_UNIX;
3882     strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3883     su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3884 
3885     if (notify_socket[0] == '@')
3886         su.sun_path[0] = '\0';
3887 
3888     memset(&iov, 0, sizeof(iov));
3889     iov.iov_base = "READY=1";
3890     iov.iov_len = strlen("READY=1");
3891 
3892     memset(&hdr, 0, sizeof(hdr));
3893     hdr.msg_name = &su;
3894     hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
3895         strlen(notify_socket);
3896     hdr.msg_iov = &iov;
3897     hdr.msg_iovlen = 1;
3898 
3899     unsetenv("NOTIFY_SOCKET");
3900 #ifdef HAVE_MSG_NOSIGNAL
3901     sendto_flags |= MSG_NOSIGNAL;
3902 #endif
3903     if (sendmsg(fd, &hdr, sendto_flags) < 0) {
3904         serverLog(LL_WARNING, "Can't send notification to systemd");
3905         close(fd);
3906         return 0;
3907     }
3908     close(fd);
3909     return 1;
3910 }
3911 
redisIsSupervised(int mode)3912 int redisIsSupervised(int mode) {
3913     if (mode == SUPERVISED_AUTODETECT) {
3914         const char *upstart_job = getenv("UPSTART_JOB");
3915         const char *notify_socket = getenv("NOTIFY_SOCKET");
3916 
3917         if (upstart_job) {
3918             redisSupervisedUpstart();
3919         } else if (notify_socket) {
3920             redisSupervisedSystemd();
3921         }
3922     } else if (mode == SUPERVISED_UPSTART) {
3923         return redisSupervisedUpstart();
3924     } else if (mode == SUPERVISED_SYSTEMD) {
3925         return redisSupervisedSystemd();
3926     }
3927 
3928     return 0;
3929 }
3930 
3931 
main(int argc,char ** argv)3932 int main(int argc, char **argv) {
3933     struct timeval tv;
3934     int j;
3935 
3936 #ifdef REDIS_TEST
3937     if (argc == 3 && !strcasecmp(argv[1], "test")) {
3938         if (!strcasecmp(argv[2], "ziplist")) {
3939             return ziplistTest(argc, argv);
3940         } else if (!strcasecmp(argv[2], "quicklist")) {
3941             quicklistTest(argc, argv);
3942         } else if (!strcasecmp(argv[2], "intset")) {
3943             return intsetTest(argc, argv);
3944         } else if (!strcasecmp(argv[2], "zipmap")) {
3945             return zipmapTest(argc, argv);
3946         } else if (!strcasecmp(argv[2], "sha1test")) {
3947             return sha1Test(argc, argv);
3948         } else if (!strcasecmp(argv[2], "util")) {
3949             return utilTest(argc, argv);
3950         } else if (!strcasecmp(argv[2], "sds")) {
3951             return sdsTest(argc, argv);
3952         } else if (!strcasecmp(argv[2], "endianconv")) {
3953             return endianconvTest(argc, argv);
3954         } else if (!strcasecmp(argv[2], "crc64")) {
3955             return crc64Test(argc, argv);
3956         }
3957 
3958         return -1; /* test not found */
3959     }
3960 #endif
3961 
3962     /* We need to initialize our libraries, and the server configuration. */
3963 #ifdef INIT_SETPROCTITLE_REPLACEMENT
3964     spt_init(argc, argv);
3965 #endif
3966     setlocale(LC_COLLATE,"");
3967     zmalloc_enable_thread_safeness();
3968     zmalloc_set_oom_handler(redisOutOfMemoryHandler);
3969     srand(time(NULL)^getpid());
3970     gettimeofday(&tv,NULL);
3971     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
3972     server.sentinel_mode = checkForSentinelMode(argc,argv);
3973     initServerConfig();
3974 
3975     /* Store the executable path and arguments in a safe place in order
3976      * to be able to restart the server later. */
3977     server.executable = getAbsolutePath(argv[0]);
3978     server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
3979     server.exec_argv[argc] = NULL;
3980     for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
3981 
3982     /* We need to init sentinel right now as parsing the configuration file
3983      * in sentinel mode will have the effect of populating the sentinel
3984      * data structures with master nodes to monitor. */
3985     if (server.sentinel_mode) {
3986         initSentinelConfig();
3987         initSentinel();
3988     }
3989 
3990     /* Check if we need to start in redis-check-rdb mode. We just execute
3991      * the program main. However the program is part of the Redis executable
3992      * so that we can easily execute an RDB check on loading errors. */
3993     if (strstr(argv[0],"redis-check-rdb") != NULL)
3994         redis_check_rdb_main(argc,argv);
3995 
3996     if (argc >= 2) {
3997         j = 1; /* First option to parse in argv[] */
3998         sds options = sdsempty();
3999         char *configfile = NULL;
4000 
4001         /* Handle special options --help and --version */
4002         if (strcmp(argv[1], "-v") == 0 ||
4003             strcmp(argv[1], "--version") == 0) version();
4004         if (strcmp(argv[1], "--help") == 0 ||
4005             strcmp(argv[1], "-h") == 0) usage();
4006         if (strcmp(argv[1], "--test-memory") == 0) {
4007             if (argc == 3) {
4008                 memtest(atoi(argv[2]),50);
4009                 exit(0);
4010             } else {
4011                 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
4012                 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
4013                 exit(1);
4014             }
4015         }
4016 
4017         /* First argument is the config file name? */
4018         if (argv[j][0] != '-' || argv[j][1] != '-') {
4019             configfile = argv[j];
4020             server.configfile = getAbsolutePath(configfile);
4021             /* Replace the config file in server.exec_argv with
4022              * its absoulte path. */
4023             zfree(server.exec_argv[j]);
4024             server.exec_argv[j] = zstrdup(server.configfile);
4025             j++;
4026         }
4027 
4028         /* All the other options are parsed and conceptually appended to the
4029          * configuration file. For instance --port 6380 will generate the
4030          * string "port 6380\n" to be parsed after the actual file name
4031          * is parsed, if any. */
4032         while(j != argc) {
4033             if (argv[j][0] == '-' && argv[j][1] == '-') {
4034                 /* Option name */
4035                 if (!strcmp(argv[j], "--check-rdb")) {
4036                     /* Argument has no options, need to skip for parsing. */
4037                     j++;
4038                     continue;
4039                 }
4040                 if (sdslen(options)) options = sdscat(options,"\n");
4041                 options = sdscat(options,argv[j]+2);
4042                 options = sdscat(options," ");
4043             } else {
4044                 /* Option argument */
4045                 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4046                 options = sdscat(options," ");
4047             }
4048             j++;
4049         }
4050         if (server.sentinel_mode && configfile && *configfile == '-') {
4051             serverLog(LL_WARNING,
4052                 "Sentinel config from STDIN not allowed.");
4053             serverLog(LL_WARNING,
4054                 "Sentinel needs config file on disk to save state.  Exiting...");
4055             exit(1);
4056         }
4057         resetServerSaveParams();
4058         loadServerConfig(configfile,options);
4059         sdsfree(options);
4060     } else {
4061         serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4062     }
4063 
4064     server.supervised = redisIsSupervised(server.supervised_mode);
4065     int background = server.daemonize && !server.supervised;
4066     if (background) daemonize();
4067 
4068     initServer();
4069     if (background || server.pidfile) createPidFile();
4070     redisSetProcTitle(argv[0]);
4071     redisAsciiArt();
4072     checkTcpBacklogSettings();
4073 
4074     if (!server.sentinel_mode) {
4075         /* Things not needed when running in Sentinel mode. */
4076         serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
4077     #ifdef __linux__
4078         linuxMemoryWarnings();
4079     #endif
4080         loadDataFromDisk();
4081         if (server.cluster_enabled) {
4082             if (verifyClusterConfigWithData() == C_ERR) {
4083                 serverLog(LL_WARNING,
4084                     "You can't have keys in a DB different than DB 0 when in "
4085                     "Cluster mode. Exiting.");
4086                 exit(1);
4087             }
4088         }
4089         if (server.ipfd_count > 0)
4090             serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4091         if (server.sofd > 0)
4092             serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4093     } else {
4094         sentinelIsRunning();
4095     }
4096 
4097     /* Warning the user about suspicious maxmemory setting. */
4098     if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4099         serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4100     }
4101 
4102     aeSetBeforeSleepProc(server.el,beforeSleep);
4103     aeMain(server.el);
4104     aeDeleteEventLoop(server.el);
4105     return 0;
4106 }
4107 
4108 /* The End */
4109