xref: /f-stack/app/redis-5.0.5/src/server.c (revision 572c4311)
1 /*
2  * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35 #include "atomicvar.h"
36 
37 #ifdef HAVE_FF_KQUEUE
38 #include "ff_api.h"
39 #include "anet_ff.h"
40 #endif
41 
42 #include <time.h>
43 #include <signal.h>
44 #include <sys/wait.h>
45 #include <errno.h>
46 #include <assert.h>
47 #include <ctype.h>
48 #include <stdarg.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <sys/uio.h>
55 #include <sys/un.h>
56 #include <limits.h>
57 #include <float.h>
58 #include <math.h>
59 #include <sys/resource.h>
60 #include <sys/utsname.h>
61 #include <locale.h>
62 #include <sys/socket.h>
63 
64 /* Our shared "common" objects */
65 
66 struct sharedObjectsStruct shared;
67 
68 /* Global vars that are actually used as constants. The following double
69  * values are used for double on-disk serialization, and are initialized
70  * at runtime to avoid strange compiler optimizations. */
71 
72 double R_Zero, R_PosInf, R_NegInf, R_Nan;
73 
74 /*================================= Globals ================================= */
75 
76 /* Global vars */
77 struct redisServer server; /* Server global state */
78 volatile unsigned long lru_clock; /* Server global current LRU time. */
79 
80 /* Our command table.
81  *
82  * Every entry is composed of the following fields:
83  *
84  * name: a string representing the command name.
85  * function: pointer to the C function implementing the command.
86  * arity: number of arguments, it is possible to use -N to say >= N
87  * sflags: command flags as string. See below for a table of flags.
88  * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
89  * get_keys_proc: an optional function to get key arguments from a command.
90  *                This is only used when the following three fields are not
91  *                enough to specify what arguments are keys.
92  * first_key_index: first argument that is a key
93  * last_key_index: last argument that is a key
94  * key_step: step to get all the keys from first to last argument. For instance
95  *           in MSET the step is two since arguments are key,val,key,val,...
96  * microseconds: microseconds of total execution time for this command.
97  * calls: total number of calls of this command.
98  *
99  * The flags, microseconds and calls fields are computed by Redis and should
100  * always be set to zero.
101  *
102  * Command flags are expressed using strings where every character represents
103  * a flag. Later the populateCommandTable() function will take care of
104  * populating the real 'flags' field using this characters.
105  *
106  * This is the meaning of the flags:
107  *
108  * w: write command (may modify the key space).
109  * r: read command  (will never modify the key space).
110  * m: may increase memory usage once called. Don't allow if out of memory.
111  * a: admin command, like SAVE or SHUTDOWN.
112  * p: Pub/Sub related command.
113  * f: force replication of this command, regardless of server.dirty.
114  * s: command not allowed in scripts.
115  * R: random command. Command is not deterministic, that is, the same command
116  *    with the same arguments, with the same key space, may have different
117  *    results. For instance SPOP and RANDOMKEY are two random commands.
118  * S: Sort command output array if called from script, so that the output
119  *    is deterministic.
120  * l: Allow command while loading the database.
121  * t: Allow command while a slave has stale data but is not allowed to
122  *    server this data. Normally no command is accepted in this condition
123  *    but just a few.
124  * M: Do not automatically propagate the command on MONITOR.
125  * k: Perform an implicit ASKING for this command, so the command will be
126  *    accepted in cluster mode if the slot is marked as 'importing'.
127  * F: Fast command: O(1) or O(log(N)) command that should never delay
128  *    its execution as long as the kernel scheduler is giving us time.
129  *    Note that commands that may trigger a DEL as a side effect (like SET)
130  *    are not fast commands.
131  */
132 struct redisCommand redisCommandTable[] = {
133     {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
134     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
135     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
136     {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
137     {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
138     {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
139     {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
140     {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
141     {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
142     {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
143     {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
144     {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
145     {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
146     {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
147     {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
148     {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
149     {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
150     {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
151     {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
152     {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
153     {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
154     {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
155     {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
156     {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
157     {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
158     {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
159     {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
160     {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
161     {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
162     {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
163     {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
164     {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
165     {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
166     {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
167     {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
168     {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
169     {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
170     {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
171     {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
172     {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
173     {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
174     {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
175     {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0},
176     {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
177     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
178     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
179     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
180     {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
181     {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
182     {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
183     {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
184     {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
185     {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
186     {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
187     {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
188     {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
189     {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
190     {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
191     {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
192     {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
193     {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
194     {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
195     {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
196     {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
197     {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
198     {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
199     {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
200     {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
201     {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
202     {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
203     {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
204     {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
205     {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
206     {"zpopmin",zpopminCommand,-2,"wF",0,NULL,1,1,1,0,0},
207     {"zpopmax",zpopmaxCommand,-2,"wF",0,NULL,1,1,1,0,0},
208     {"bzpopmin",bzpopminCommand,-3,"wsF",0,NULL,1,-2,1,0,0},
209     {"bzpopmax",bzpopmaxCommand,-3,"wsF",0,NULL,1,-2,1,0,0},
210     {"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
211     {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
212     {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
213     {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
214     {"hmget",hmgetCommand,-3,"rF",0,NULL,1,1,1,0,0},
215     {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
216     {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
217     {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
218     {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
219     {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
220     {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
221     {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
222     {"hgetall",hgetallCommand,2,"rR",0,NULL,1,1,1,0,0},
223     {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
224     {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
225     {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
226     {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
227     {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
228     {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
229     {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
230     {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
231     {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
232     {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
233     {"swapdb",swapdbCommand,3,"wF",0,NULL,0,0,0,0,0},
234     {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
235     {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
236     {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
237     {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
238     {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
239     {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
240     {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
241     {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
242     {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
243     {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
244     {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0},
245     {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0},
246     {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0},
247     {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0},
248     {"bgsave",bgsaveCommand,-1,"as",0,NULL,0,0,0,0,0},
249     {"bgrewriteaof",bgrewriteaofCommand,1,"as",0,NULL,0,0,0,0,0},
250     {"shutdown",shutdownCommand,-1,"aslt",0,NULL,0,0,0,0,0},
251     {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0},
252     {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
253     {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
254     {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
255     {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
256     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
257     {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
258     {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0},
259     {"flushdb",flushdbCommand,-1,"w",0,NULL,0,0,0,0,0},
260     {"flushall",flushallCommand,-1,"w",0,NULL,0,0,0,0,0},
261     {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
262     {"info",infoCommand,-1,"ltR",0,NULL,0,0,0,0,0},
263     {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0},
264     {"ttl",ttlCommand,2,"rFR",0,NULL,1,1,1,0,0},
265     {"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0},
266     {"pttl",pttlCommand,2,"rFR",0,NULL,1,1,1,0,0},
267     {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
268     {"slaveof",replicaofCommand,3,"ast",0,NULL,0,0,0,0,0},
269     {"replicaof",replicaofCommand,3,"ast",0,NULL,0,0,0,0,0},
270     {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
271     {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
272     {"config",configCommand,-2,"last",0,NULL,0,0,0,0,0},
273     {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
274     {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
275     {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
276     {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
277     {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
278     {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
279     {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
280     {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
281     {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0},
282     {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
283     {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
284     {"migrate",migrateCommand,-6,"wR",0,migrateGetKeys,0,0,0,0,0},
285     {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
286     {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
287     {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
288     {"dump",dumpCommand,2,"rR",0,NULL,1,1,1,0,0},
289     {"object",objectCommand,-2,"rR",0,NULL,2,2,1,0,0},
290     {"memory",memoryCommand,-2,"rR",0,NULL,0,0,0,0,0},
291     {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
292     {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
293     {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
294     {"slowlog",slowlogCommand,-2,"aR",0,NULL,0,0,0,0,0},
295     {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0},
296     {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0},
297     {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
298     {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
299     {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
300     {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0},
301     {"command",commandCommand,0,"ltR",0,NULL,0,0,0,0,0},
302     {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
303     {"georadius",georadiusCommand,-6,"w",0,georadiusGetKeys,1,1,1,0,0},
304     {"georadius_ro",georadiusroCommand,-6,"r",0,georadiusGetKeys,1,1,1,0,0},
305     {"georadiusbymember",georadiusbymemberCommand,-5,"w",0,georadiusGetKeys,1,1,1,0,0},
306     {"georadiusbymember_ro",georadiusbymemberroCommand,-5,"r",0,georadiusGetKeys,1,1,1,0,0},
307     {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
308     {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
309     {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
310     {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0},
311     {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
312     {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
313     {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
314     {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
315     {"xadd",xaddCommand,-5,"wmFR",0,NULL,1,1,1,0,0},
316     {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
317     {"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
318     {"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
319     {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
320     {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
321     {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
322     {"xsetid",xsetidCommand,3,"wmF",0,NULL,1,1,1,0,0},
323     {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
324     {"xpending",xpendingCommand,-3,"rR",0,NULL,1,1,1,0,0},
325     {"xclaim",xclaimCommand,-6,"wRF",0,NULL,1,1,1,0,0},
326     {"xinfo",xinfoCommand,-2,"rR",0,NULL,2,2,1,0,0},
327     {"xdel",xdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
328     {"xtrim",xtrimCommand,-2,"wFR",0,NULL,1,1,1,0,0},
329     {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
330     {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
331     {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0},
332     {"lolwut",lolwutCommand,-1,"r",0,NULL,0,0,0,0,0}
333 };
334 
335 /*============================ Utility functions ============================ */
336 
337 /* We use a private localtime implementation which is fork-safe. The logging
338  * function of Redis may be called from other threads. */
339 void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
340 
341 /* Low level logging. To use only for very big messages, otherwise
342  * serverLog() is to prefer. */
serverLogRaw(int level,const char * msg)343 void serverLogRaw(int level, const char *msg) {
344     const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
345     const char *c = ".-*#";
346     FILE *fp;
347     char buf[64];
348     int rawmode = (level & LL_RAW);
349     int log_to_stdout = server.logfile[0] == '\0';
350 
351     level &= 0xff; /* clear flags */
352     if (level < server.verbosity) return;
353 
354     fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
355     if (!fp) return;
356 
357     if (rawmode) {
358         fprintf(fp,"%s",msg);
359     } else {
360         int off;
361         struct timeval tv;
362         int role_char;
363         pid_t pid = getpid();
364 
365         gettimeofday(&tv,NULL);
366         struct tm tm;
367         nolocks_localtime(&tm,tv.tv_sec,server.timezone,server.daylight_active);
368         off = strftime(buf,sizeof(buf),"%d %b %Y %H:%M:%S.",&tm);
369         snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
370         if (server.sentinel_mode) {
371             role_char = 'X'; /* Sentinel. */
372         } else if (pid != server.pid) {
373             role_char = 'C'; /* RDB / AOF writing child. */
374         } else {
375             role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
376         }
377         fprintf(fp,"%d:%c %s %c %s\n",
378             (int)getpid(),role_char, buf,c[level],msg);
379     }
380     fflush(fp);
381 
382     if (!log_to_stdout) fclose(fp);
383     if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
384 }
385 
386 /* Like serverLogRaw() but with printf-alike support. This is the function that
387  * is used across the code. The raw version is only used in order to dump
388  * the INFO output on crash. */
serverLog(int level,const char * fmt,...)389 void serverLog(int level, const char *fmt, ...) {
390     va_list ap;
391     char msg[LOG_MAX_LEN];
392 
393     if ((level&0xff) < server.verbosity) return;
394 
395     va_start(ap, fmt);
396     vsnprintf(msg, sizeof(msg), fmt, ap);
397     va_end(ap);
398 
399     serverLogRaw(level,msg);
400 }
401 
402 /* Log a fixed message without printf-alike capabilities, in a way that is
403  * safe to call from a signal handler.
404  *
405  * We actually use this only for signals that are not fatal from the point
406  * of view of Redis. Signals that are going to kill the server anyway and
407  * where we need printf-alike features are served by serverLog(). */
serverLogFromHandler(int level,const char * msg)408 void serverLogFromHandler(int level, const char *msg) {
409     int fd;
410     int log_to_stdout = server.logfile[0] == '\0';
411     char buf[64];
412 
413     if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
414         return;
415     fd = log_to_stdout ? STDOUT_FILENO :
416                          open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
417     if (fd == -1) return;
418     ll2string(buf,sizeof(buf),getpid());
419     if (write(fd,buf,strlen(buf)) == -1) goto err;
420     if (write(fd,":signal-handler (",17) == -1) goto err;
421     ll2string(buf,sizeof(buf),time(NULL));
422     if (write(fd,buf,strlen(buf)) == -1) goto err;
423     if (write(fd,") ",2) == -1) goto err;
424     if (write(fd,msg,strlen(msg)) == -1) goto err;
425     if (write(fd,"\n",1) == -1) goto err;
426 err:
427     if (!log_to_stdout) close(fd);
428 }
429 
430 /* Return the UNIX time in microseconds */
ustime(void)431 long long ustime(void) {
432     struct timeval tv;
433     long long ust;
434 
435     gettimeofday(&tv, NULL);
436     ust = ((long long)tv.tv_sec)*1000000;
437     ust += tv.tv_usec;
438     return ust;
439 }
440 
441 /* Return the UNIX time in milliseconds */
mstime(void)442 mstime_t mstime(void) {
443     return ustime()/1000;
444 }
445 
446 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
447  * exit(), because the latter may interact with the same file objects used by
448  * the parent process. However if we are testing the coverage normal exit() is
449  * used in order to obtain the right coverage information. */
exitFromChild(int retcode)450 void exitFromChild(int retcode) {
451 #ifdef COVERAGE_TEST
452     exit(retcode);
453 #else
454     _exit(retcode);
455 #endif
456 }
457 
458 /*====================== Hash table type implementation  ==================== */
459 
460 /* This is a hash table type that uses the SDS dynamic strings library as
461  * keys and redis objects as values (objects can hold SDS strings,
462  * lists, sets). */
463 
dictVanillaFree(void * privdata,void * val)464 void dictVanillaFree(void *privdata, void *val)
465 {
466     DICT_NOTUSED(privdata);
467     zfree(val);
468 }
469 
dictListDestructor(void * privdata,void * val)470 void dictListDestructor(void *privdata, void *val)
471 {
472     DICT_NOTUSED(privdata);
473     listRelease((list*)val);
474 }
475 
dictSdsKeyCompare(void * privdata,const void * key1,const void * key2)476 int dictSdsKeyCompare(void *privdata, const void *key1,
477         const void *key2)
478 {
479     int l1,l2;
480     DICT_NOTUSED(privdata);
481 
482     l1 = sdslen((sds)key1);
483     l2 = sdslen((sds)key2);
484     if (l1 != l2) return 0;
485     return memcmp(key1, key2, l1) == 0;
486 }
487 
488 /* A case insensitive version used for the command lookup table and other
489  * places where case insensitive non binary-safe comparison is needed. */
dictSdsKeyCaseCompare(void * privdata,const void * key1,const void * key2)490 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
491         const void *key2)
492 {
493     DICT_NOTUSED(privdata);
494 
495     return strcasecmp(key1, key2) == 0;
496 }
497 
dictObjectDestructor(void * privdata,void * val)498 void dictObjectDestructor(void *privdata, void *val)
499 {
500     DICT_NOTUSED(privdata);
501 
502     if (val == NULL) return; /* Lazy freeing will set value to NULL. */
503     decrRefCount(val);
504 }
505 
dictSdsDestructor(void * privdata,void * val)506 void dictSdsDestructor(void *privdata, void *val)
507 {
508     DICT_NOTUSED(privdata);
509 
510     sdsfree(val);
511 }
512 
dictObjKeyCompare(void * privdata,const void * key1,const void * key2)513 int dictObjKeyCompare(void *privdata, const void *key1,
514         const void *key2)
515 {
516     const robj *o1 = key1, *o2 = key2;
517     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
518 }
519 
dictObjHash(const void * key)520 uint64_t dictObjHash(const void *key) {
521     const robj *o = key;
522     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
523 }
524 
dictSdsHash(const void * key)525 uint64_t dictSdsHash(const void *key) {
526     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
527 }
528 
dictSdsCaseHash(const void * key)529 uint64_t dictSdsCaseHash(const void *key) {
530     return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
531 }
532 
dictEncObjKeyCompare(void * privdata,const void * key1,const void * key2)533 int dictEncObjKeyCompare(void *privdata, const void *key1,
534         const void *key2)
535 {
536     robj *o1 = (robj*) key1, *o2 = (robj*) key2;
537     int cmp;
538 
539     if (o1->encoding == OBJ_ENCODING_INT &&
540         o2->encoding == OBJ_ENCODING_INT)
541             return o1->ptr == o2->ptr;
542 
543     o1 = getDecodedObject(o1);
544     o2 = getDecodedObject(o2);
545     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
546     decrRefCount(o1);
547     decrRefCount(o2);
548     return cmp;
549 }
550 
dictEncObjHash(const void * key)551 uint64_t dictEncObjHash(const void *key) {
552     robj *o = (robj*) key;
553 
554     if (sdsEncodedObject(o)) {
555         return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
556     } else {
557         if (o->encoding == OBJ_ENCODING_INT) {
558             char buf[32];
559             int len;
560 
561             len = ll2string(buf,32,(long)o->ptr);
562             return dictGenHashFunction((unsigned char*)buf, len);
563         } else {
564             uint64_t hash;
565 
566             o = getDecodedObject(o);
567             hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
568             decrRefCount(o);
569             return hash;
570         }
571     }
572 }
573 
574 /* Generic hash table type where keys are Redis Objects, Values
575  * dummy pointers. */
576 dictType objectKeyPointerValueDictType = {
577     dictEncObjHash,            /* hash function */
578     NULL,                      /* key dup */
579     NULL,                      /* val dup */
580     dictEncObjKeyCompare,      /* key compare */
581     dictObjectDestructor,      /* key destructor */
582     NULL                       /* val destructor */
583 };
584 
585 /* Like objectKeyPointerValueDictType(), but values can be destroyed, if
586  * not NULL, calling zfree(). */
587 dictType objectKeyHeapPointerValueDictType = {
588     dictEncObjHash,            /* hash function */
589     NULL,                      /* key dup */
590     NULL,                      /* val dup */
591     dictEncObjKeyCompare,      /* key compare */
592     dictObjectDestructor,      /* key destructor */
593     dictVanillaFree            /* val destructor */
594 };
595 
596 /* Set dictionary type. Keys are SDS strings, values are ot used. */
597 dictType setDictType = {
598     dictSdsHash,               /* hash function */
599     NULL,                      /* key dup */
600     NULL,                      /* val dup */
601     dictSdsKeyCompare,         /* key compare */
602     dictSdsDestructor,         /* key destructor */
603     NULL                       /* val destructor */
604 };
605 
606 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
607 dictType zsetDictType = {
608     dictSdsHash,               /* hash function */
609     NULL,                      /* key dup */
610     NULL,                      /* val dup */
611     dictSdsKeyCompare,         /* key compare */
612     NULL,                      /* Note: SDS string shared & freed by skiplist */
613     NULL                       /* val destructor */
614 };
615 
616 /* Db->dict, keys are sds strings, vals are Redis objects. */
617 dictType dbDictType = {
618     dictSdsHash,                /* hash function */
619     NULL,                       /* key dup */
620     NULL,                       /* val dup */
621     dictSdsKeyCompare,          /* key compare */
622     dictSdsDestructor,          /* key destructor */
623     dictObjectDestructor   /* val destructor */
624 };
625 
626 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
627 dictType shaScriptObjectDictType = {
628     dictSdsCaseHash,            /* hash function */
629     NULL,                       /* key dup */
630     NULL,                       /* val dup */
631     dictSdsKeyCaseCompare,      /* key compare */
632     dictSdsDestructor,          /* key destructor */
633     dictObjectDestructor        /* val destructor */
634 };
635 
636 /* Db->expires */
637 dictType keyptrDictType = {
638     dictSdsHash,                /* hash function */
639     NULL,                       /* key dup */
640     NULL,                       /* val dup */
641     dictSdsKeyCompare,          /* key compare */
642     NULL,                       /* key destructor */
643     NULL                        /* val destructor */
644 };
645 
646 /* Command table. sds string -> command struct pointer. */
647 dictType commandTableDictType = {
648     dictSdsCaseHash,            /* hash function */
649     NULL,                       /* key dup */
650     NULL,                       /* val dup */
651     dictSdsKeyCaseCompare,      /* key compare */
652     dictSdsDestructor,          /* key destructor */
653     NULL                        /* val destructor */
654 };
655 
656 /* Hash type hash table (note that small hashes are represented with ziplists) */
657 dictType hashDictType = {
658     dictSdsHash,                /* hash function */
659     NULL,                       /* key dup */
660     NULL,                       /* val dup */
661     dictSdsKeyCompare,          /* key compare */
662     dictSdsDestructor,          /* key destructor */
663     dictSdsDestructor           /* val destructor */
664 };
665 
666 /* Keylist hash table type has unencoded redis objects as keys and
667  * lists as values. It's used for blocking operations (BLPOP) and to
668  * map swapped keys to a list of clients waiting for this keys to be loaded. */
669 dictType keylistDictType = {
670     dictObjHash,                /* hash function */
671     NULL,                       /* key dup */
672     NULL,                       /* val dup */
673     dictObjKeyCompare,          /* key compare */
674     dictObjectDestructor,       /* key destructor */
675     dictListDestructor          /* val destructor */
676 };
677 
678 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
679  * clusterNode structures. */
680 dictType clusterNodesDictType = {
681     dictSdsHash,                /* hash function */
682     NULL,                       /* key dup */
683     NULL,                       /* val dup */
684     dictSdsKeyCompare,          /* key compare */
685     dictSdsDestructor,          /* key destructor */
686     NULL                        /* val destructor */
687 };
688 
689 /* Cluster re-addition blacklist. This maps node IDs to the time
690  * we can re-add this node. The goal is to avoid readding a removed
691  * node for some time. */
692 dictType clusterNodesBlackListDictType = {
693     dictSdsCaseHash,            /* hash function */
694     NULL,                       /* key dup */
695     NULL,                       /* val dup */
696     dictSdsKeyCaseCompare,      /* key compare */
697     dictSdsDestructor,          /* key destructor */
698     NULL                        /* val destructor */
699 };
700 
701 /* Cluster re-addition blacklist. This maps node IDs to the time
702  * we can re-add this node. The goal is to avoid readding a removed
703  * node for some time. */
704 dictType modulesDictType = {
705     dictSdsCaseHash,            /* hash function */
706     NULL,                       /* key dup */
707     NULL,                       /* val dup */
708     dictSdsKeyCaseCompare,      /* key compare */
709     dictSdsDestructor,          /* key destructor */
710     NULL                        /* val destructor */
711 };
712 
713 /* Migrate cache dict type. */
714 dictType migrateCacheDictType = {
715     dictSdsHash,                /* hash function */
716     NULL,                       /* key dup */
717     NULL,                       /* val dup */
718     dictSdsKeyCompare,          /* key compare */
719     dictSdsDestructor,          /* key destructor */
720     NULL                        /* val destructor */
721 };
722 
723 /* Replication cached script dict (server.repl_scriptcache_dict).
724  * Keys are sds SHA1 strings, while values are not used at all in the current
725  * implementation. */
726 dictType replScriptCacheDictType = {
727     dictSdsCaseHash,            /* hash function */
728     NULL,                       /* key dup */
729     NULL,                       /* val dup */
730     dictSdsKeyCaseCompare,      /* key compare */
731     dictSdsDestructor,          /* key destructor */
732     NULL                        /* val destructor */
733 };
734 
htNeedsResize(dict * dict)735 int htNeedsResize(dict *dict) {
736     long long size, used;
737 
738     size = dictSlots(dict);
739     used = dictSize(dict);
740     return (size > DICT_HT_INITIAL_SIZE &&
741             (used*100/size < HASHTABLE_MIN_FILL));
742 }
743 
744 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
745  * we resize the hash table to save memory */
tryResizeHashTables(int dbid)746 void tryResizeHashTables(int dbid) {
747     if (htNeedsResize(server.db[dbid].dict))
748         dictResize(server.db[dbid].dict);
749     if (htNeedsResize(server.db[dbid].expires))
750         dictResize(server.db[dbid].expires);
751 }
752 
753 /* Our hash table implementation performs rehashing incrementally while
754  * we write/read from the hash table. Still if the server is idle, the hash
755  * table will use two tables for a long time. So we try to use 1 millisecond
756  * of CPU time at every call of this function to perform some rehahsing.
757  *
758  * The function returns 1 if some rehashing was performed, otherwise 0
759  * is returned. */
incrementallyRehash(int dbid)760 int incrementallyRehash(int dbid) {
761     /* Keys dictionary */
762     if (dictIsRehashing(server.db[dbid].dict)) {
763         dictRehashMilliseconds(server.db[dbid].dict,1);
764         return 1; /* already used our millisecond for this loop... */
765     }
766     /* Expires */
767     if (dictIsRehashing(server.db[dbid].expires)) {
768         dictRehashMilliseconds(server.db[dbid].expires,1);
769         return 1; /* already used our millisecond for this loop... */
770     }
771     return 0;
772 }
773 
774 /* This function is called once a background process of some kind terminates,
775  * as we want to avoid resizing the hash tables when there is a child in order
776  * to play well with copy-on-write (otherwise when a resize happens lots of
777  * memory pages are copied). The goal of this function is to update the ability
778  * for dict.c to resize the hash tables accordingly to the fact we have o not
779  * running childs. */
updateDictResizePolicy(void)780 void updateDictResizePolicy(void) {
781     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
782         dictEnableResize();
783     else
784         dictDisableResize();
785 }
786 
787 /* ======================= Cron: called every 100 ms ======================== */
788 
789 /* Add a sample to the operations per second array of samples. */
trackInstantaneousMetric(int metric,long long current_reading)790 void trackInstantaneousMetric(int metric, long long current_reading) {
791     long long t = mstime() - server.inst_metric[metric].last_sample_time;
792     long long ops = current_reading -
793                     server.inst_metric[metric].last_sample_count;
794     long long ops_sec;
795 
796     ops_sec = t > 0 ? (ops*1000/t) : 0;
797 
798     server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
799         ops_sec;
800     server.inst_metric[metric].idx++;
801     server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
802     server.inst_metric[metric].last_sample_time = mstime();
803     server.inst_metric[metric].last_sample_count = current_reading;
804 }
805 
806 /* Return the mean of all the samples. */
getInstantaneousMetric(int metric)807 long long getInstantaneousMetric(int metric) {
808     int j;
809     long long sum = 0;
810 
811     for (j = 0; j < STATS_METRIC_SAMPLES; j++)
812         sum += server.inst_metric[metric].samples[j];
813     return sum / STATS_METRIC_SAMPLES;
814 }
815 
816 /* Check for timeouts. Returns non-zero if the client was terminated.
817  * The function gets the current time in milliseconds as argument since
818  * it gets called multiple times in a loop, so calling gettimeofday() for
819  * each iteration would be costly without any actual gain. */
clientsCronHandleTimeout(client * c,mstime_t now_ms)820 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
821     time_t now = now_ms/1000;
822 
823     if (server.maxidletime &&
824         !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
825         !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
826         !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
827         !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
828         (now - c->lastinteraction > server.maxidletime))
829     {
830         serverLog(LL_VERBOSE,"Closing idle client");
831         freeClient(c);
832         return 1;
833     } else if (c->flags & CLIENT_BLOCKED) {
834         /* Blocked OPS timeout is handled with milliseconds resolution.
835          * However note that the actual resolution is limited by
836          * server.hz. */
837 
838         if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
839             /* Handle blocking operation specific timeout. */
840             replyToBlockedClientTimedOut(c);
841             unblockClient(c);
842         } else if (server.cluster_enabled) {
843             /* Cluster: handle unblock & redirect of clients blocked
844              * into keys no longer served by this server. */
845             if (clusterRedirectBlockedClientIfNeeded(c))
846                 unblockClient(c);
847         }
848     }
849     return 0;
850 }
851 
852 /* The client query buffer is an sds.c string that can end with a lot of
853  * free space not used, this function reclaims space if needed.
854  *
855  * The function always returns 0 as it never terminates the client. */
clientsCronResizeQueryBuffer(client * c)856 int clientsCronResizeQueryBuffer(client *c) {
857     size_t querybuf_size = sdsAllocSize(c->querybuf);
858     time_t idletime = server.unixtime - c->lastinteraction;
859 
860     /* There are two conditions to resize the query buffer:
861      * 1) Query buffer is > BIG_ARG and too big for latest peak.
862      * 2) Query buffer is > BIG_ARG and client is idle. */
863     if (querybuf_size > PROTO_MBULK_BIG_ARG &&
864          ((querybuf_size/(c->querybuf_peak+1)) > 2 ||
865           idletime > 2))
866     {
867         /* Only resize the query buffer if it is actually wasting
868          * at least a few kbytes. */
869         if (sdsavail(c->querybuf) > 1024*4) {
870             c->querybuf = sdsRemoveFreeSpace(c->querybuf);
871         }
872     }
873     /* Reset the peak again to capture the peak memory usage in the next
874      * cycle. */
875     c->querybuf_peak = 0;
876 
877     /* Clients representing masters also use a "pending query buffer" that
878      * is the yet not applied part of the stream we are reading. Such buffer
879      * also needs resizing from time to time, otherwise after a very large
880      * transfer (a huge value or a big MIGRATE operation) it will keep using
881      * a lot of memory. */
882     if (c->flags & CLIENT_MASTER) {
883         /* There are two conditions to resize the pending query buffer:
884          * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
885          * 2) Used length is smaller than pending_querybuf_size/2 */
886         size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
887         if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
888            sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
889         {
890             c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
891         }
892     }
893     return 0;
894 }
895 
896 /* This function is used in order to track clients using the biggest amount
897  * of memory in the latest few seconds. This way we can provide such information
898  * in the INFO output (clients section), without having to do an O(N) scan for
899  * all the clients.
900  *
901  * This is how it works. We have an array of CLIENTS_PEAK_MEM_USAGE_SLOTS slots
902  * where we track, for each, the biggest client output and input buffers we
903  * saw in that slot. Every slot correspond to one of the latest seconds, since
904  * the array is indexed by doing UNIXTIME % CLIENTS_PEAK_MEM_USAGE_SLOTS.
905  *
906  * When we want to know what was recently the peak memory usage, we just scan
907  * such few slots searching for the maximum value. */
908 #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8
909 size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
910 size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
911 
clientsCronTrackExpansiveClients(client * c)912 int clientsCronTrackExpansiveClients(client *c) {
913     size_t in_usage = sdsAllocSize(c->querybuf);
914     size_t out_usage = getClientOutputBufferMemoryUsage(c);
915     int i = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
916     int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
917 
918     /* Always zero the next sample, so that when we switch to that second, we'll
919      * only register samples that are greater in that second without considering
920      * the history of such slot.
921      *
922      * Note: our index may jump to any random position if serverCron() is not
923      * called for some reason with the normal frequency, for instance because
924      * some slow command is called taking multiple seconds to execute. In that
925      * case our array may end containing data which is potentially older
926      * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
927      * since here we want just to track if "recently" there were very expansive
928      * clients from the POV of memory usage. */
929     ClientsPeakMemInput[zeroidx] = 0;
930     ClientsPeakMemOutput[zeroidx] = 0;
931 
932     /* Track the biggest values observed so far in this slot. */
933     if (in_usage > ClientsPeakMemInput[i]) ClientsPeakMemInput[i] = in_usage;
934     if (out_usage > ClientsPeakMemOutput[i]) ClientsPeakMemOutput[i] = out_usage;
935 
936     return 0; /* This function never terminates the client. */
937 }
938 
939 /* Return the max samples in the memory usage of clients tracked by
940  * the function clientsCronTrackExpansiveClients(). */
getExpansiveClientsInfo(size_t * in_usage,size_t * out_usage)941 void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
942     size_t i = 0, o = 0;
943     for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) {
944         if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j];
945         if (ClientsPeakMemOutput[j] > o) o = ClientsPeakMemOutput[j];
946     }
947     *in_usage = i;
948     *out_usage = o;
949 }
950 
951 /* This function is called by serverCron() and is used in order to perform
952  * operations on clients that are important to perform constantly. For instance
953  * we use this function in order to disconnect clients after a timeout, including
954  * clients blocked in some blocking command with a non-zero timeout.
955  *
956  * The function makes some effort to process all the clients every second, even
957  * if this cannot be strictly guaranteed, since serverCron() may be called with
958  * an actual frequency lower than server.hz in case of latency events like slow
959  * commands.
960  *
961  * It is very important for this function, and the functions it calls, to be
962  * very fast: sometimes Redis has tens of hundreds of connected clients, and the
963  * default server.hz value is 10, so sometimes here we need to process thousands
964  * of clients per second, turning this function into a source of latency.
965  */
966 #define CLIENTS_CRON_MIN_ITERATIONS 5
clientsCron(void)967 void clientsCron(void) {
968     /* Try to process at least numclients/server.hz of clients
969      * per call. Since normally (if there are no big latency events) this
970      * function is called server.hz times per second, in the average case we
971      * process all the clients in 1 second. */
972     int numclients = listLength(server.clients);
973     int iterations = numclients/server.hz;
974     mstime_t now = mstime();
975 
976     /* Process at least a few clients while we are at it, even if we need
977      * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
978      * of processing each client once per second. */
979     if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
980         iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
981                      numclients : CLIENTS_CRON_MIN_ITERATIONS;
982 
983     while(listLength(server.clients) && iterations--) {
984         client *c;
985         listNode *head;
986 
987         /* Rotate the list, take the current head, process.
988          * This way if the client must be removed from the list it's the
989          * first element and we don't incur into O(N) computation. */
990         listRotate(server.clients);
991         head = listFirst(server.clients);
992         c = listNodeValue(head);
993         /* The following functions do different service checks on the client.
994          * The protocol is that they return non-zero if the client was
995          * terminated. */
996         if (clientsCronHandleTimeout(c,now)) continue;
997         if (clientsCronResizeQueryBuffer(c)) continue;
998         if (clientsCronTrackExpansiveClients(c)) continue;
999     }
1000 }
1001 
1002 /* This function handles 'background' operations we are required to do
1003  * incrementally in Redis databases, such as active key expiring, resizing,
1004  * rehashing. */
databasesCron(void)1005 void databasesCron(void) {
1006     /* Expire keys by random sampling. Not required for slaves
1007      * as master will synthesize DELs for us. */
1008     if (server.active_expire_enabled) {
1009         if (server.masterhost == NULL) {
1010             activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1011         } else {
1012             expireSlaveKeys();
1013         }
1014     }
1015 
1016     /* Defrag keys gradually. */
1017     if (server.active_defrag_enabled)
1018         activeDefragCycle();
1019 
1020     /* Perform hash tables rehashing if needed, but only if there are no
1021      * other processes saving the DB on disk. Otherwise rehashing is bad
1022      * as will cause a lot of copy-on-write of memory pages. */
1023     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1024         /* We use global counters so if we stop the computation at a given
1025          * DB we'll be able to start from the successive in the next
1026          * cron loop iteration. */
1027         static unsigned int resize_db = 0;
1028         static unsigned int rehash_db = 0;
1029         int dbs_per_call = CRON_DBS_PER_CALL;
1030         int j;
1031 
1032         /* Don't test more DBs than we have. */
1033         if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1034 
1035         /* Resize */
1036         for (j = 0; j < dbs_per_call; j++) {
1037             tryResizeHashTables(resize_db % server.dbnum);
1038             resize_db++;
1039         }
1040 
1041         /* Rehash */
1042         if (server.activerehashing) {
1043             for (j = 0; j < dbs_per_call; j++) {
1044                 int work_done = incrementallyRehash(rehash_db);
1045                 if (work_done) {
1046                     /* If the function did some work, stop here, we'll do
1047                      * more at the next cron loop. */
1048                     break;
1049                 } else {
1050                     /* If this db didn't need rehash, we'll try the next one. */
1051                     rehash_db++;
1052                     rehash_db %= server.dbnum;
1053                 }
1054             }
1055         }
1056     }
1057 }
1058 
1059 /* We take a cached value of the unix time in the global state because with
1060  * virtual memory and aging there is to store the current time in objects at
1061  * every object access, and accuracy is not needed. To access a global var is
1062  * a lot faster than calling time(NULL) */
updateCachedTime(void)1063 void updateCachedTime(void) {
1064     time_t unixtime = time(NULL);
1065     atomicSet(server.unixtime,unixtime);
1066     server.mstime = mstime();
1067 
1068     /* To get information about daylight saving time, we need to call localtime_r
1069      * and cache the result. However calling localtime_r in this context is safe
1070      * since we will never fork() while here, in the main thread. The logging
1071      * function will call a thread safe version of localtime that has no locks. */
1072     struct tm tm;
1073     localtime_r(&server.unixtime,&tm);
1074     server.daylight_active = tm.tm_isdst;
1075 }
1076 
1077 /* This is our timer interrupt, called server.hz times per second.
1078  * Here is where we do a number of things that need to be done asynchronously.
1079  * For instance:
1080  *
1081  * - Active expired keys collection (it is also performed in a lazy way on
1082  *   lookup).
1083  * - Software watchdog.
1084  * - Update some statistic.
1085  * - Incremental rehashing of the DBs hash tables.
1086  * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1087  * - Clients timeout of different kinds.
1088  * - Replication reconnection.
1089  * - Many more...
1090  *
1091  * Everything directly called here will be called server.hz times per second,
1092  * so in order to throttle execution of things we want to do less frequently
1093  * a macro is used: run_with_period(milliseconds) { .... }
1094  */
1095 
serverCron(struct aeEventLoop * eventLoop,long long id,void * clientData)1096 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1097     int j;
1098     UNUSED(eventLoop);
1099     UNUSED(id);
1100     UNUSED(clientData);
1101 
1102     /* Software watchdog: deliver the SIGALRM that will reach the signal
1103      * handler if we don't return here fast enough. */
1104     if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1105 
1106     /* Update the time cache. */
1107     updateCachedTime();
1108 
1109     server.hz = server.config_hz;
1110     /* Adapt the server.hz value to the number of configured clients. If we have
1111      * many clients, we want to call serverCron() with an higher frequency. */
1112     if (server.dynamic_hz) {
1113         while (listLength(server.clients) / server.hz >
1114                MAX_CLIENTS_PER_CLOCK_TICK)
1115         {
1116             server.hz *= 2;
1117             if (server.hz > CONFIG_MAX_HZ) {
1118                 server.hz = CONFIG_MAX_HZ;
1119                 break;
1120             }
1121         }
1122     }
1123 
1124     run_with_period(100) {
1125         trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1126         trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1127                 server.stat_net_input_bytes);
1128         trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1129                 server.stat_net_output_bytes);
1130     }
1131 
1132     /* We have just LRU_BITS bits per object for LRU information.
1133      * So we use an (eventually wrapping) LRU clock.
1134      *
1135      * Note that even if the counter wraps it's not a big problem,
1136      * everything will still work but some object will appear younger
1137      * to Redis. However for this to happen a given object should never be
1138      * touched for all the time needed to the counter to wrap, which is
1139      * not likely.
1140      *
1141      * Note that you can change the resolution altering the
1142      * LRU_CLOCK_RESOLUTION define. */
1143     unsigned long lruclock = getLRUClock();
1144     atomicSet(server.lruclock,lruclock);
1145 
1146     /* Record the max memory used since the server was started. */
1147     if (zmalloc_used_memory() > server.stat_peak_memory)
1148         server.stat_peak_memory = zmalloc_used_memory();
1149 
1150     run_with_period(100) {
1151         /* Sample the RSS and other metrics here since this is a relatively slow call.
1152          * We must sample the zmalloc_used at the same time we take the rss, otherwise
1153          * the frag ratio calculate may be off (ratio of two samples at different times) */
1154         server.cron_malloc_stats.process_rss = zmalloc_get_rss();
1155         server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
1156         /* Sampling the allcator info can be slow too.
1157          * The fragmentation ratio it'll show is potentically more accurate
1158          * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc
1159          * allocations, and allocator reserved pages that can be pursed (all not actual frag) */
1160         zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
1161                                    &server.cron_malloc_stats.allocator_active,
1162                                    &server.cron_malloc_stats.allocator_resident);
1163         /* in case the allocator isn't providing these stats, fake them so that
1164          * fragmention info still shows some (inaccurate metrics) */
1165         if (!server.cron_malloc_stats.allocator_resident) {
1166             /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS,
1167              * so we must desuct it in order to be able to calculate correct
1168              * "allocator fragmentation" ratio */
1169             size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
1170             server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
1171         }
1172         if (!server.cron_malloc_stats.allocator_active)
1173             server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
1174         if (!server.cron_malloc_stats.allocator_allocated)
1175             server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
1176     }
1177 
1178     /* We received a SIGTERM, shutting down here in a safe way, as it is
1179      * not ok doing so inside the signal handler. */
1180     if (server.shutdown_asap) {
1181         if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1182         serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1183         server.shutdown_asap = 0;
1184     }
1185 
1186     /* Show some info about non-empty databases */
1187     run_with_period(5000) {
1188         for (j = 0; j < server.dbnum; j++) {
1189             long long size, used, vkeys;
1190 
1191             size = dictSlots(server.db[j].dict);
1192             used = dictSize(server.db[j].dict);
1193             vkeys = dictSize(server.db[j].expires);
1194             if (used || vkeys) {
1195                 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1196                 /* dictPrintStats(server.dict); */
1197             }
1198         }
1199     }
1200 
1201     /* Show information about connected clients */
1202     if (!server.sentinel_mode) {
1203         run_with_period(5000) {
1204             serverLog(LL_VERBOSE,
1205                 "%lu clients connected (%lu replicas), %zu bytes in use",
1206                 listLength(server.clients)-listLength(server.slaves),
1207                 listLength(server.slaves),
1208                 zmalloc_used_memory());
1209         }
1210     }
1211 
1212     /* We need to do a few operations on clients asynchronously. */
1213     clientsCron();
1214 
1215     /* Handle background operations on Redis databases. */
1216     databasesCron();
1217 
1218     /* Start a scheduled AOF rewrite if this was requested by the user while
1219      * a BGSAVE was in progress. */
1220     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1221         server.aof_rewrite_scheduled)
1222     {
1223         rewriteAppendOnlyFileBackground();
1224     }
1225 
1226     /* Check if a background saving or AOF rewrite in progress terminated. */
1227     if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1228         ldbPendingChildren())
1229     {
1230         int statloc;
1231         pid_t pid;
1232 
1233         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1234             int exitcode = WEXITSTATUS(statloc);
1235             int bysignal = 0;
1236 
1237             if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1238 
1239             if (pid == -1) {
1240                 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1241                     "rdb_child_pid = %d, aof_child_pid = %d",
1242                     strerror(errno),
1243                     (int) server.rdb_child_pid,
1244                     (int) server.aof_child_pid);
1245             } else if (pid == server.rdb_child_pid) {
1246                 backgroundSaveDoneHandler(exitcode,bysignal);
1247                 if (!bysignal && exitcode == 0) receiveChildInfo();
1248             } else if (pid == server.aof_child_pid) {
1249                 backgroundRewriteDoneHandler(exitcode,bysignal);
1250                 if (!bysignal && exitcode == 0) receiveChildInfo();
1251             } else {
1252                 if (!ldbRemoveChild(pid)) {
1253                     serverLog(LL_WARNING,
1254                         "Warning, detected child with unmatched pid: %ld",
1255                         (long)pid);
1256                 }
1257             }
1258             updateDictResizePolicy();
1259             closeChildInfoPipe();
1260         }
1261     } else {
1262         /* If there is not a background saving/rewrite in progress check if
1263          * we have to save/rewrite now. */
1264         for (j = 0; j < server.saveparamslen; j++) {
1265             struct saveparam *sp = server.saveparams+j;
1266 
1267             /* Save if we reached the given amount of changes,
1268              * the given amount of seconds, and if the latest bgsave was
1269              * successful or if, in case of an error, at least
1270              * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1271             if (server.dirty >= sp->changes &&
1272                 server.unixtime-server.lastsave > sp->seconds &&
1273                 (server.unixtime-server.lastbgsave_try >
1274                  CONFIG_BGSAVE_RETRY_DELAY ||
1275                  server.lastbgsave_status == C_OK))
1276             {
1277                 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1278                     sp->changes, (int)sp->seconds);
1279                 rdbSaveInfo rsi, *rsiptr;
1280                 rsiptr = rdbPopulateSaveInfo(&rsi);
1281                 rdbSaveBackground(server.rdb_filename,rsiptr);
1282                 break;
1283             }
1284         }
1285 
1286         /* Trigger an AOF rewrite if needed. */
1287         if (server.aof_state == AOF_ON &&
1288             server.rdb_child_pid == -1 &&
1289             server.aof_child_pid == -1 &&
1290             server.aof_rewrite_perc &&
1291             server.aof_current_size > server.aof_rewrite_min_size)
1292         {
1293             long long base = server.aof_rewrite_base_size ?
1294                 server.aof_rewrite_base_size : 1;
1295             long long growth = (server.aof_current_size*100/base) - 100;
1296             if (growth >= server.aof_rewrite_perc) {
1297                 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1298                 rewriteAppendOnlyFileBackground();
1299             }
1300         }
1301     }
1302 
1303 
1304     /* AOF postponed flush: Try at every cron cycle if the slow fsync
1305      * completed. */
1306     if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1307 
1308     /* AOF write errors: in this case we have a buffer to flush as well and
1309      * clear the AOF error in case of success to make the DB writable again,
1310      * however to try every second is enough in case of 'hz' is set to
1311      * an higher frequency. */
1312     run_with_period(1000) {
1313         if (server.aof_last_write_status == C_ERR)
1314             flushAppendOnlyFile(0);
1315     }
1316 
1317     /* Close clients that need to be closed asynchronous */
1318     freeClientsInAsyncFreeQueue();
1319 
1320     /* Clear the paused clients flag if needed. */
1321     clientsArePaused(); /* Don't check return value, just use the side effect.*/
1322 
1323     /* Replication cron function -- used to reconnect to master,
1324      * detect transfer failures, start background RDB transfers and so forth. */
1325     run_with_period(1000) replicationCron();
1326 
1327     /* Run the Redis Cluster cron. */
1328     run_with_period(100) {
1329         if (server.cluster_enabled) clusterCron();
1330     }
1331 
1332     /* Run the Sentinel timer if we are in sentinel mode. */
1333     if (server.sentinel_mode) sentinelTimer();
1334 
1335     /* Cleanup expired MIGRATE cached sockets. */
1336     run_with_period(1000) {
1337         migrateCloseTimedoutSockets();
1338     }
1339 
1340     /* Start a scheduled BGSAVE if the corresponding flag is set. This is
1341      * useful when we are forced to postpone a BGSAVE because an AOF
1342      * rewrite is in progress.
1343      *
1344      * Note: this code must be after the replicationCron() call above so
1345      * make sure when refactoring this file to keep this order. This is useful
1346      * because we want to give priority to RDB savings for replication. */
1347     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1348         server.rdb_bgsave_scheduled &&
1349         (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
1350          server.lastbgsave_status == C_OK))
1351     {
1352         rdbSaveInfo rsi, *rsiptr;
1353         rsiptr = rdbPopulateSaveInfo(&rsi);
1354         if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
1355             server.rdb_bgsave_scheduled = 0;
1356     }
1357 
1358     server.cronloops++;
1359     return 1000/server.hz;
1360 }
1361 
1362 /* This function gets called every time Redis is entering the
1363  * main loop of the event driven library, that is, before to sleep
1364  * for ready file descriptors. */
beforeSleep(struct aeEventLoop * eventLoop)1365 void beforeSleep(struct aeEventLoop *eventLoop) {
1366     UNUSED(eventLoop);
1367 
1368     /* Call the Redis Cluster before sleep function. Note that this function
1369      * may change the state of Redis Cluster (from ok to fail or vice versa),
1370      * so it's a good idea to call it before serving the unblocked clients
1371      * later in this function. */
1372     if (server.cluster_enabled) clusterBeforeSleep();
1373 
1374     /* Run a fast expire cycle (the called function will return
1375      * ASAP if a fast cycle is not needed). */
1376     if (server.active_expire_enabled && server.masterhost == NULL)
1377         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1378 
1379     /* Send all the slaves an ACK request if at least one client blocked
1380      * during the previous event loop iteration. */
1381     if (server.get_ack_from_slaves) {
1382         robj *argv[3];
1383 
1384         argv[0] = createStringObject("REPLCONF",8);
1385         argv[1] = createStringObject("GETACK",6);
1386         argv[2] = createStringObject("*",1); /* Not used argument. */
1387         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1388         decrRefCount(argv[0]);
1389         decrRefCount(argv[1]);
1390         decrRefCount(argv[2]);
1391         server.get_ack_from_slaves = 0;
1392     }
1393 
1394     /* Unblock all the clients blocked for synchronous replication
1395      * in WAIT. */
1396     if (listLength(server.clients_waiting_acks))
1397         processClientsWaitingReplicas();
1398 
1399     /* Check if there are clients unblocked by modules that implement
1400      * blocking commands. */
1401     moduleHandleBlockedClients();
1402 
1403     /* Try to process pending commands for clients that were just unblocked. */
1404     if (listLength(server.unblocked_clients))
1405         processUnblockedClients();
1406 
1407     /* Write the AOF buffer on disk */
1408     flushAppendOnlyFile(0);
1409 
1410     /* Handle writes with pending output buffers. */
1411     handleClientsWithPendingWrites();
1412 
1413     /* Before we are going to sleep, let the threads access the dataset by
1414      * releasing the GIL. Redis main thread will not touch anything at this
1415      * time. */
1416     if (moduleCount()) moduleReleaseGIL();
1417 }
1418 
1419 /* This function is called immadiately after the event loop multiplexing
1420  * API returned, and the control is going to soon return to Redis by invoking
1421  * the different events callbacks. */
afterSleep(struct aeEventLoop * eventLoop)1422 void afterSleep(struct aeEventLoop *eventLoop) {
1423     UNUSED(eventLoop);
1424     if (moduleCount()) moduleAcquireGIL();
1425 }
1426 
1427 /* =========================== Server initialization ======================== */
1428 
createSharedObjects(void)1429 void createSharedObjects(void) {
1430     int j;
1431 
1432     shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1433     shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1434     shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1435     shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1436     shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1437     shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1438     shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1439     shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1440     shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1441     shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1442     shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1443     shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1444     shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1445     shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1446         "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1447     shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1448         "-ERR no such key\r\n"));
1449     shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1450         "-ERR syntax error\r\n"));
1451     shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1452         "-ERR source and destination objects are the same\r\n"));
1453     shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1454         "-ERR index out of range\r\n"));
1455     shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1456         "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1457     shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1458         "-LOADING Redis is loading the dataset in memory\r\n"));
1459     shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1460         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1461     shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1462         "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
1463     shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1464         "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
1465     shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1466         "-READONLY You can't write against a read only replica.\r\n"));
1467     shared.noautherr = createObject(OBJ_STRING,sdsnew(
1468         "-NOAUTH Authentication required.\r\n"));
1469     shared.oomerr = createObject(OBJ_STRING,sdsnew(
1470         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1471     shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1472         "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1473     shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1474         "-NOREPLICAS Not enough good replicas to write.\r\n"));
1475     shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1476         "-BUSYKEY Target key name already exists.\r\n"));
1477     shared.space = createObject(OBJ_STRING,sdsnew(" "));
1478     shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1479     shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1480 
1481     for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1482         char dictid_str[64];
1483         int dictid_len;
1484 
1485         dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1486         shared.select[j] = createObject(OBJ_STRING,
1487             sdscatprintf(sdsempty(),
1488                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1489                 dictid_len, dictid_str));
1490     }
1491     shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1492     shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1493     shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1494     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1495     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1496     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1497     shared.del = createStringObject("DEL",3);
1498     shared.unlink = createStringObject("UNLINK",6);
1499     shared.rpop = createStringObject("RPOP",4);
1500     shared.lpop = createStringObject("LPOP",4);
1501     shared.lpush = createStringObject("LPUSH",5);
1502     shared.rpoplpush = createStringObject("RPOPLPUSH",9);
1503     shared.zpopmin = createStringObject("ZPOPMIN",7);
1504     shared.zpopmax = createStringObject("ZPOPMAX",7);
1505     for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1506         shared.integers[j] =
1507             makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
1508         shared.integers[j]->encoding = OBJ_ENCODING_INT;
1509     }
1510     for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1511         shared.mbulkhdr[j] = createObject(OBJ_STRING,
1512             sdscatprintf(sdsempty(),"*%d\r\n",j));
1513         shared.bulkhdr[j] = createObject(OBJ_STRING,
1514             sdscatprintf(sdsempty(),"$%d\r\n",j));
1515     }
1516     /* The following two shared objects, minstring and maxstrings, are not
1517      * actually used for their value but as a special object meaning
1518      * respectively the minimum possible string and the maximum possible
1519      * string in string comparisons for the ZRANGEBYLEX command. */
1520     shared.minstring = sdsnew("minstring");
1521     shared.maxstring = sdsnew("maxstring");
1522 }
1523 
initServerConfig(void)1524 void initServerConfig(void) {
1525     int j;
1526 
1527     pthread_mutex_init(&server.next_client_id_mutex,NULL);
1528     pthread_mutex_init(&server.lruclock_mutex,NULL);
1529     pthread_mutex_init(&server.unixtime_mutex,NULL);
1530 
1531     updateCachedTime();
1532     getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1533     server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1534     changeReplicationId();
1535     clearReplicationId2();
1536     server.timezone = getTimeZone(); /* Initialized by tzset(). */
1537     server.configfile = NULL;
1538     server.executable = NULL;
1539     server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
1540     server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
1541     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1542     server.port = CONFIG_DEFAULT_SERVER_PORT;
1543     server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1544     server.bindaddr_count = 0;
1545     server.unixsocket = NULL;
1546     server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1547     server.ipfd_count = 0;
1548     server.sofd = -1;
1549     server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
1550     server.dbnum = CONFIG_DEFAULT_DBNUM;
1551     server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1552     server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1553     server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1554     server.active_expire_enabled = 1;
1555     server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG;
1556     server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES;
1557     server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER;
1558     server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER;
1559     server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN;
1560     server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX;
1561     server.active_defrag_max_scan_fields = CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS;
1562     server.proto_max_bulk_len = CONFIG_DEFAULT_PROTO_MAX_BULK_LEN;
1563     server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1564     server.saveparams = NULL;
1565     server.loading = 0;
1566     server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1567     server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1568     server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1569     server.syslog_facility = LOG_LOCAL0;
1570     server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1571     server.supervised = 0;
1572     server.supervised_mode = SUPERVISED_NONE;
1573     server.aof_state = AOF_OFF;
1574     server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1575     server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1576     server.aof_rewrite_perc = AOF_REWRITE_PERC;
1577     server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1578     server.aof_rewrite_base_size = 0;
1579     server.aof_rewrite_scheduled = 0;
1580     server.aof_last_fsync = time(NULL);
1581     server.aof_rewrite_time_last = -1;
1582     server.aof_rewrite_time_start = -1;
1583     server.aof_lastbgrewrite_status = C_OK;
1584     server.aof_delayed_fsync = 0;
1585     server.aof_fd = -1;
1586     server.aof_selected_db = -1; /* Make sure the first time will not match */
1587     server.aof_flush_postponed_start = 0;
1588     server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1589     server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
1590     server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1591     server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
1592     server.pidfile = NULL;
1593     server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1594     server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1595     server.requirepass = NULL;
1596     server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1597     server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1598     server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1599     server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1600     server.active_defrag_running = 0;
1601     server.notify_keyspace_events = 0;
1602     server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1603     server.blocked_clients = 0;
1604     memset(server.blocked_clients_by_type,0,
1605            sizeof(server.blocked_clients_by_type));
1606     server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1607     server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1608     server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1609     server.lfu_log_factor = CONFIG_DEFAULT_LFU_LOG_FACTOR;
1610     server.lfu_decay_time = CONFIG_DEFAULT_LFU_DECAY_TIME;
1611     server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1612     server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1613     server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1614     server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1615     server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1616     server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1617     server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1618     server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1619     server.stream_node_max_bytes = OBJ_STREAM_NODE_MAX_BYTES;
1620     server.stream_node_max_entries = OBJ_STREAM_NODE_MAX_ENTRIES;
1621     server.shutdown_asap = 0;
1622     server.cluster_enabled = 0;
1623     server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1624     server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1625     server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1626     server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1627     server.cluster_slave_no_failover = CLUSTER_DEFAULT_SLAVE_NO_FAILOVER;
1628     server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1629     server.cluster_announce_ip = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_IP;
1630     server.cluster_announce_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT;
1631     server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT;
1632     server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
1633     server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1634     server.next_client_id = 1; /* Client IDs, start from 1 .*/
1635     server.loading_process_events_interval_bytes = (1024*1024*2);
1636     server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION;
1637     server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE;
1638     server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL;
1639     server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
1640     server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
1641 
1642     unsigned int lruclock = getLRUClock();
1643     atomicSet(server.lruclock,lruclock);
1644     resetServerSaveParams();
1645 
1646     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
1647     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
1648     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1649 
1650     /* Replication related */
1651     server.masterauth = NULL;
1652     server.masterhost = NULL;
1653     server.masterport = 6379;
1654     server.master = NULL;
1655     server.cached_master = NULL;
1656     server.master_initial_offset = -1;
1657     server.repl_state = REPL_STATE_NONE;
1658     server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1659     server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1660     server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1661     server.repl_slave_ignore_maxmemory = CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY;
1662     server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH;
1663     server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1664     server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1665     server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1666     server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1667     server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1668     server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1669     server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1670     server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1671     server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1672     server.slave_announce_ip = CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP;
1673     server.slave_announce_port = CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT;
1674     server.master_repl_offset = 0;
1675 
1676     /* Replication partial resync backlog */
1677     server.repl_backlog = NULL;
1678     server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1679     server.repl_backlog_histlen = 0;
1680     server.repl_backlog_idx = 0;
1681     server.repl_backlog_off = 0;
1682     server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1683     server.repl_no_slaves_since = time(NULL);
1684 
1685     /* Client output buffer limits */
1686     for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1687         server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1688 
1689     /* Double constants initialization */
1690     R_Zero = 0.0;
1691     R_PosInf = 1.0/R_Zero;
1692     R_NegInf = -1.0/R_Zero;
1693     R_Nan = R_Zero/R_Zero;
1694 
1695     /* Command table -- we initiialize it here as it is part of the
1696      * initial configuration, since command names may be changed via
1697      * redis.conf using the rename-command directive. */
1698     server.commands = dictCreate(&commandTableDictType,NULL);
1699     server.orig_commands = dictCreate(&commandTableDictType,NULL);
1700     populateCommandTable();
1701     server.delCommand = lookupCommandByCString("del");
1702     server.multiCommand = lookupCommandByCString("multi");
1703     server.lpushCommand = lookupCommandByCString("lpush");
1704     server.lpopCommand = lookupCommandByCString("lpop");
1705     server.rpopCommand = lookupCommandByCString("rpop");
1706     server.zpopminCommand = lookupCommandByCString("zpopmin");
1707     server.zpopmaxCommand = lookupCommandByCString("zpopmax");
1708     server.sremCommand = lookupCommandByCString("srem");
1709     server.execCommand = lookupCommandByCString("exec");
1710     server.expireCommand = lookupCommandByCString("expire");
1711     server.pexpireCommand = lookupCommandByCString("pexpire");
1712     server.xclaimCommand = lookupCommandByCString("xclaim");
1713     server.xgroupCommand = lookupCommandByCString("xgroup");
1714 
1715     /* Slow log */
1716     server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1717     server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1718 
1719     /* Latency monitor */
1720     server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1721 
1722     /* Debugging */
1723     server.assert_failed = "<no assertion failed>";
1724     server.assert_file = "<no file>";
1725     server.assert_line = 0;
1726     server.bug_report_start = 0;
1727     server.watchdog_period = 0;
1728 
1729     /* By default we want scripts to be always replicated by effects
1730      * (single commands executed by the script), and not by sending the
1731      * script to the slave / AOF. This is the new way starting from
1732      * Redis 5. However it is possible to revert it via redis.conf. */
1733     server.lua_always_replicate_commands = 1;
1734 }
1735 
1736 extern char **environ;
1737 
1738 /* Restart the server, executing the same executable that started this
1739  * instance, with the same arguments and configuration file.
1740  *
1741  * The function is designed to directly call execve() so that the new
1742  * server instance will retain the PID of the previous one.
1743  *
1744  * The list of flags, that may be bitwise ORed together, alter the
1745  * behavior of this function:
1746  *
1747  * RESTART_SERVER_NONE              No flags.
1748  * RESTART_SERVER_GRACEFULLY        Do a proper shutdown before restarting.
1749  * RESTART_SERVER_CONFIG_REWRITE    Rewrite the config file before restarting.
1750  *
1751  * On success the function does not return, because the process turns into
1752  * a different process. On error C_ERR is returned. */
restartServer(int flags,mstime_t delay)1753 int restartServer(int flags, mstime_t delay) {
1754     int j;
1755 
1756     /* Check if we still have accesses to the executable that started this
1757      * server instance. */
1758     if (access(server.executable,X_OK) == -1) {
1759         serverLog(LL_WARNING,"Can't restart: this process has no "
1760                              "permissions to execute %s", server.executable);
1761         return C_ERR;
1762     }
1763 
1764     /* Config rewriting. */
1765     if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1766         server.configfile &&
1767         rewriteConfig(server.configfile) == -1)
1768     {
1769         serverLog(LL_WARNING,"Can't restart: configuration rewrite process "
1770                              "failed");
1771         return C_ERR;
1772     }
1773 
1774     /* Perform a proper shutdown. */
1775     if (flags & RESTART_SERVER_GRACEFULLY &&
1776         prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
1777     {
1778         serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
1779         return C_ERR;
1780     }
1781 
1782     /* Close all file descriptors, with the exception of stdin, stdout, strerr
1783      * which are useful if we restart a Redis server which is not daemonized. */
1784     for (j = 3; j < (int)server.maxclients + 1024; j++) {
1785         /* Test the descriptor validity before closing it, otherwise
1786          * Valgrind issues a warning on close(). */
1787         if (fcntl(j,F_GETFD) != -1) close(j);
1788     }
1789 
1790     /* Execute the server with the original command line. */
1791     if (delay) usleep(delay*1000);
1792     zfree(server.exec_argv[0]);
1793     server.exec_argv[0] = zstrdup(server.executable);
1794     execve(server.executable,server.exec_argv,environ);
1795 
1796     /* If an error occurred here, there is nothing we can do, but exit. */
1797     _exit(1);
1798 
1799     return C_ERR; /* Never reached. */
1800 }
1801 
1802 /* This function will try to raise the max number of open files accordingly to
1803  * the configured max number of clients. It also reserves a number of file
1804  * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1805  * persistence, listening sockets, log files and so forth.
1806  *
1807  * If it will not be possible to set the limit accordingly to the configured
1808  * max number of clients, the function will do the reverse setting
1809  * server.maxclients to the value that we can actually handle. */
adjustOpenFilesLimit(void)1810 void adjustOpenFilesLimit(void) {
1811     rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1812     struct rlimit limit;
1813 
1814     if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1815         serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1816             strerror(errno));
1817         server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1818     } else {
1819         rlim_t oldlimit = limit.rlim_cur;
1820 
1821         /* Set the max number of files if the current limit is not enough
1822          * for our needs. */
1823         if (oldlimit < maxfiles) {
1824             rlim_t bestlimit;
1825             int setrlimit_error = 0;
1826 
1827             /* Try to set the file limit to match 'maxfiles' or at least
1828              * to the higher value supported less than maxfiles. */
1829             bestlimit = maxfiles;
1830             while(bestlimit > oldlimit) {
1831                 rlim_t decr_step = 16;
1832 
1833                 limit.rlim_cur = bestlimit;
1834                 limit.rlim_max = bestlimit;
1835                 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1836                 setrlimit_error = errno;
1837 
1838                 /* We failed to set file limit to 'bestlimit'. Try with a
1839                  * smaller limit decrementing by a few FDs per iteration. */
1840                 if (bestlimit < decr_step) break;
1841                 bestlimit -= decr_step;
1842             }
1843 
1844             /* Assume that the limit we get initially is still valid if
1845              * our last try was even lower. */
1846             if (bestlimit < oldlimit) bestlimit = oldlimit;
1847 
1848             if (bestlimit < maxfiles) {
1849                 unsigned int old_maxclients = server.maxclients;
1850                 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1851                 /* maxclients is unsigned so may overflow: in order
1852                  * to check if maxclients is now logically less than 1
1853                  * we test indirectly via bestlimit. */
1854                 if (bestlimit <= CONFIG_MIN_RESERVED_FDS) {
1855                     serverLog(LL_WARNING,"Your current 'ulimit -n' "
1856                         "of %llu is not enough for the server to start. "
1857                         "Please increase your open file limit to at least "
1858                         "%llu. Exiting.",
1859                         (unsigned long long) oldlimit,
1860                         (unsigned long long) maxfiles);
1861                     exit(1);
1862                 }
1863                 serverLog(LL_WARNING,"You requested maxclients of %d "
1864                     "requiring at least %llu max file descriptors.",
1865                     old_maxclients,
1866                     (unsigned long long) maxfiles);
1867                 serverLog(LL_WARNING,"Server can't set maximum open files "
1868                     "to %llu because of OS error: %s.",
1869                     (unsigned long long) maxfiles, strerror(setrlimit_error));
1870                 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1871                     "maxclients has been reduced to %d to compensate for "
1872                     "low ulimit. "
1873                     "If you need higher maxclients increase 'ulimit -n'.",
1874                     (unsigned long long) bestlimit, server.maxclients);
1875             } else {
1876                 serverLog(LL_NOTICE,"Increased maximum number of open files "
1877                     "to %llu (it was originally set to %llu).",
1878                     (unsigned long long) maxfiles,
1879                     (unsigned long long) oldlimit);
1880             }
1881         }
1882     }
1883 }
1884 
1885 /* Check that server.tcp_backlog can be actually enforced in Linux according
1886  * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
checkTcpBacklogSettings(void)1887 void checkTcpBacklogSettings(void) {
1888 #ifdef HAVE_PROC_SOMAXCONN
1889     FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1890     char buf[1024];
1891     if (!fp) return;
1892     if (fgets(buf,sizeof(buf),fp) != NULL) {
1893         int somaxconn = atoi(buf);
1894         if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1895             serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1896         }
1897     }
1898     fclose(fp);
1899 #endif
1900 }
1901 
1902 /* Initialize a set of file descriptors to listen to the specified 'port'
1903  * binding the addresses specified in the Redis server configuration.
1904  *
1905  * The listening file descriptors are stored in the integer array 'fds'
1906  * and their number is set in '*count'.
1907  *
1908  * The addresses to bind are specified in the global server.bindaddr array
1909  * and their number is server.bindaddr_count. If the server configuration
1910  * contains no specific addresses to bind, this function will try to
1911  * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1912  *
1913  * On success the function returns C_OK.
1914  *
1915  * On error the function returns C_ERR. For the function to be on
1916  * error, at least one of the server.bindaddr addresses was
1917  * impossible to bind, or no bind addresses were specified in the server
1918  * configuration but the function is not able to bind * for at least
1919  * one of the IPv4 or IPv6 protocols. */
listenToPort(int port,int * fds,int * count)1920 int listenToPort(int port, int *fds, int *count) {
1921     int j;
1922 
1923     /* Force binding of 0.0.0.0 if no bind address is specified, always
1924      * entering the loop if j == 0. */
1925     if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1926     for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1927         if (server.bindaddr[j] == NULL) {
1928             int unsupported = 0;
1929             /* Bind * for both IPv6 and IPv4, we enter here only if
1930              * server.bindaddr_count == 0. */
1931             fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1932                 server.tcp_backlog);
1933             if (fds[*count] != ANET_ERR) {
1934                 anetNonBlock(NULL,fds[*count]);
1935                 (*count)++;
1936             } else if (errno == EAFNOSUPPORT) {
1937                 unsupported++;
1938                 serverLog(LL_WARNING,"Not listening to IPv6: unsupproted");
1939             }
1940 
1941             if (*count == 1 || unsupported) {
1942                 /* Bind the IPv4 address as well. */
1943                 fds[*count] = anetTcpServer(server.neterr,port,NULL,
1944                     server.tcp_backlog);
1945                 if (fds[*count] != ANET_ERR) {
1946                     anetNonBlock(NULL,fds[*count]);
1947                     (*count)++;
1948                 } else if (errno == EAFNOSUPPORT) {
1949                     unsupported++;
1950                     serverLog(LL_WARNING,"Not listening to IPv4: unsupproted");
1951                 }
1952             }
1953             /* Exit the loop if we were able to bind * on IPv4 and IPv6,
1954              * otherwise fds[*count] will be ANET_ERR and we'll print an
1955              * error and return to the caller with an error. */
1956             if (*count + unsupported == 2) break;
1957         } else if (strchr(server.bindaddr[j],':')) {
1958             /* Bind IPv6 address. */
1959             fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1960                 server.tcp_backlog);
1961         } else {
1962             /* Bind IPv4 address. */
1963             fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1964                 server.tcp_backlog);
1965         }
1966         if (fds[*count] == ANET_ERR) {
1967             serverLog(LL_WARNING,
1968                 "Could not create server TCP listening socket %s:%d: %s",
1969                 server.bindaddr[j] ? server.bindaddr[j] : "*",
1970                 port, server.neterr);
1971                 if (errno == ENOPROTOOPT     || errno == EPROTONOSUPPORT ||
1972                     errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
1973                     errno == EAFNOSUPPORT    || errno == EADDRNOTAVAIL)
1974                     continue;
1975             return C_ERR;
1976         }
1977         anetNonBlock(NULL,fds[*count]);
1978         (*count)++;
1979     }
1980     return C_OK;
1981 }
1982 
1983 /* Resets the stats that we expose via INFO or other means that we want
1984  * to reset via CONFIG RESETSTAT. The function is also used in order to
1985  * initialize these fields in initServer() at server startup. */
resetServerStats(void)1986 void resetServerStats(void) {
1987     int j;
1988 
1989     server.stat_numcommands = 0;
1990     server.stat_numconnections = 0;
1991     server.stat_expiredkeys = 0;
1992     server.stat_expired_stale_perc = 0;
1993     server.stat_expired_time_cap_reached_count = 0;
1994     server.stat_evictedkeys = 0;
1995     server.stat_keyspace_misses = 0;
1996     server.stat_keyspace_hits = 0;
1997     server.stat_active_defrag_hits = 0;
1998     server.stat_active_defrag_misses = 0;
1999     server.stat_active_defrag_key_hits = 0;
2000     server.stat_active_defrag_key_misses = 0;
2001     server.stat_active_defrag_scanned = 0;
2002     server.stat_fork_time = 0;
2003     server.stat_fork_rate = 0;
2004     server.stat_rejected_conn = 0;
2005     server.stat_sync_full = 0;
2006     server.stat_sync_partial_ok = 0;
2007     server.stat_sync_partial_err = 0;
2008     for (j = 0; j < STATS_METRIC_COUNT; j++) {
2009         server.inst_metric[j].idx = 0;
2010         server.inst_metric[j].last_sample_time = mstime();
2011         server.inst_metric[j].last_sample_count = 0;
2012         memset(server.inst_metric[j].samples,0,
2013             sizeof(server.inst_metric[j].samples));
2014     }
2015     server.stat_net_input_bytes = 0;
2016     server.stat_net_output_bytes = 0;
2017     server.aof_delayed_fsync = 0;
2018 }
2019 
initServer(void)2020 void initServer(void) {
2021     int j;
2022 
2023     signal(SIGHUP, SIG_IGN);
2024     signal(SIGPIPE, SIG_IGN);
2025     setupSignalHandlers();
2026 
2027     if (server.syslog_enabled) {
2028         openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
2029             server.syslog_facility);
2030     }
2031 
2032     server.hz = server.config_hz;
2033     server.pid = getpid();
2034     server.current_client = NULL;
2035     server.clients = listCreate();
2036     server.clients_index = raxNew();
2037     server.clients_to_close = listCreate();
2038     server.slaves = listCreate();
2039     server.monitors = listCreate();
2040     server.clients_pending_write = listCreate();
2041     server.slaveseldb = -1; /* Force to emit the first SELECT command. */
2042     server.unblocked_clients = listCreate();
2043     server.ready_keys = listCreate();
2044     server.clients_waiting_acks = listCreate();
2045     server.get_ack_from_slaves = 0;
2046     server.clients_paused = 0;
2047     server.system_memory_size = zmalloc_get_memory_size();
2048 
2049     createSharedObjects();
2050     adjustOpenFilesLimit();
2051     server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
2052     if (server.el == NULL) {
2053         serverLog(LL_WARNING,
2054             "Failed creating the event loop. Error message: '%s'",
2055             strerror(errno));
2056         exit(1);
2057     }
2058     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
2059 
2060     /* Open the TCP listening socket for the user commands. */
2061     if (server.port != 0 &&
2062         listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
2063         exit(1);
2064 
2065     /* Open the listening Unix domain socket. */
2066     if (server.unixsocket != NULL) {
2067         unlink(server.unixsocket); /* don't care if this fails */
2068         server.sofd = anetUnixServer(server.neterr,server.unixsocket,
2069             server.unixsocketperm, server.tcp_backlog);
2070         if (server.sofd == ANET_ERR) {
2071             serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
2072             exit(1);
2073         }
2074         anetNonBlock(NULL,server.sofd);
2075     }
2076 
2077     /* Abort if there are no listening sockets at all. */
2078     if (server.ipfd_count == 0 && server.sofd < 0) {
2079         serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
2080         exit(1);
2081     }
2082 
2083     /* Create the Redis databases, and initialize other internal state. */
2084     for (j = 0; j < server.dbnum; j++) {
2085         server.db[j].dict = dictCreate(&dbDictType,NULL);
2086         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
2087         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
2088         server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
2089         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
2090         server.db[j].id = j;
2091         server.db[j].avg_ttl = 0;
2092         server.db[j].defrag_later = listCreate();
2093     }
2094     evictionPoolAlloc(); /* Initialize the LRU keys pool. */
2095     server.pubsub_channels = dictCreate(&keylistDictType,NULL);
2096     server.pubsub_patterns = listCreate();
2097     listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
2098     listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
2099     server.cronloops = 0;
2100     server.rdb_child_pid = -1;
2101     server.aof_child_pid = -1;
2102     server.rdb_child_type = RDB_CHILD_TYPE_NONE;
2103     server.rdb_bgsave_scheduled = 0;
2104     server.child_info_pipe[0] = -1;
2105     server.child_info_pipe[1] = -1;
2106     server.child_info_data.magic = 0;
2107     aofRewriteBufferReset();
2108     server.aof_buf = sdsempty();
2109     server.lastsave = time(NULL); /* At startup we consider the DB saved. */
2110     server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
2111     server.rdb_save_time_last = -1;
2112     server.rdb_save_time_start = -1;
2113     server.dirty = 0;
2114     resetServerStats();
2115     /* A few stats we don't want to reset: server startup time, and peak mem. */
2116     server.stat_starttime = time(NULL);
2117     server.stat_peak_memory = 0;
2118     server.stat_rdb_cow_bytes = 0;
2119     server.stat_aof_cow_bytes = 0;
2120     server.cron_malloc_stats.zmalloc_used = 0;
2121     server.cron_malloc_stats.process_rss = 0;
2122     server.cron_malloc_stats.allocator_allocated = 0;
2123     server.cron_malloc_stats.allocator_active = 0;
2124     server.cron_malloc_stats.allocator_resident = 0;
2125     server.lastbgsave_status = C_OK;
2126     server.aof_last_write_status = C_OK;
2127     server.aof_last_write_errno = 0;
2128     server.repl_good_slaves_count = 0;
2129 
2130     /* Create the timer callback, this is our way to process many background
2131      * operations incrementally, like clients timeout, eviction of unaccessed
2132      * expired keys and so forth. */
2133     if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
2134         serverPanic("Can't create event loop timers.");
2135         exit(1);
2136     }
2137 
2138     /* Create an event handler for accepting new connections in TCP and Unix
2139      * domain sockets. */
2140     for (j = 0; j < server.ipfd_count; j++) {
2141         if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
2142             acceptTcpHandler,NULL) == AE_ERR)
2143             {
2144                 serverPanic(
2145                     "Unrecoverable error creating server.ipfd file event.");
2146             }
2147     }
2148     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
2149         acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
2150 
2151 #ifndef HAVE_FF_KQUEUE
2152     /* Register a readable event for the pipe used to awake the event loop
2153      * when a blocked client in a module needs attention. */
2154     if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
2155         moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
2156             serverPanic(
2157                 "Error registering the readable event for the module "
2158                 "blocked clients subsystem.");
2159     }
2160 #endif
2161 
2162     /* Open the AOF file if needed. */
2163     if (server.aof_state == AOF_ON) {
2164         server.aof_fd = open(server.aof_filename,
2165                                O_WRONLY|O_APPEND|O_CREAT,0644);
2166         if (server.aof_fd == -1) {
2167             serverLog(LL_WARNING, "Can't open the append-only file: %s",
2168                 strerror(errno));
2169             exit(1);
2170         }
2171     }
2172 
2173     /* 32 bit instances are limited to 4GB of address space, so if there is
2174      * no explicit limit in the user provided configuration we set a limit
2175      * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
2176      * useless crashes of the Redis instance for out of memory. */
2177     if (server.arch_bits == 32 && server.maxmemory == 0) {
2178         serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
2179         server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
2180         server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
2181     }
2182 
2183     if (server.cluster_enabled) clusterInit();
2184     replicationScriptCacheInit();
2185     scriptingInit(1);
2186     slowlogInit();
2187     latencyMonitorInit();
2188     bioInit();
2189     server.initial_memory_usage = zmalloc_used_memory();
2190 }
2191 
2192 /* Populates the Redis Command Table starting from the hard coded list
2193  * we have on top of redis.c file. */
populateCommandTable(void)2194 void populateCommandTable(void) {
2195     int j;
2196     int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2197 
2198     for (j = 0; j < numcommands; j++) {
2199         struct redisCommand *c = redisCommandTable+j;
2200         char *f = c->sflags;
2201         int retval1, retval2;
2202 
2203         while(*f != '\0') {
2204             switch(*f) {
2205             case 'w': c->flags |= CMD_WRITE; break;
2206             case 'r': c->flags |= CMD_READONLY; break;
2207             case 'm': c->flags |= CMD_DENYOOM; break;
2208             case 'a': c->flags |= CMD_ADMIN; break;
2209             case 'p': c->flags |= CMD_PUBSUB; break;
2210             case 's': c->flags |= CMD_NOSCRIPT; break;
2211             case 'R': c->flags |= CMD_RANDOM; break;
2212             case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
2213             case 'l': c->flags |= CMD_LOADING; break;
2214             case 't': c->flags |= CMD_STALE; break;
2215             case 'M': c->flags |= CMD_SKIP_MONITOR; break;
2216             case 'k': c->flags |= CMD_ASKING; break;
2217             case 'F': c->flags |= CMD_FAST; break;
2218             default: serverPanic("Unsupported command flag"); break;
2219             }
2220             f++;
2221         }
2222 
2223         retval1 = dictAdd(server.commands, sdsnew(c->name), c);
2224         /* Populate an additional dictionary that will be unaffected
2225          * by rename-command statements in redis.conf. */
2226         retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2227         serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2228     }
2229 }
2230 
resetCommandTableStats(void)2231 void resetCommandTableStats(void) {
2232     struct redisCommand *c;
2233     dictEntry *de;
2234     dictIterator *di;
2235 
2236     di = dictGetSafeIterator(server.commands);
2237     while((de = dictNext(di)) != NULL) {
2238         c = (struct redisCommand *) dictGetVal(de);
2239         c->microseconds = 0;
2240         c->calls = 0;
2241     }
2242     dictReleaseIterator(di);
2243 
2244 }
2245 
2246 /* ========================== Redis OP Array API ============================ */
2247 
redisOpArrayInit(redisOpArray * oa)2248 void redisOpArrayInit(redisOpArray *oa) {
2249     oa->ops = NULL;
2250     oa->numops = 0;
2251 }
2252 
redisOpArrayAppend(redisOpArray * oa,struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2253 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2254                        robj **argv, int argc, int target)
2255 {
2256     redisOp *op;
2257 
2258     oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2259     op = oa->ops+oa->numops;
2260     op->cmd = cmd;
2261     op->dbid = dbid;
2262     op->argv = argv;
2263     op->argc = argc;
2264     op->target = target;
2265     oa->numops++;
2266     return oa->numops;
2267 }
2268 
redisOpArrayFree(redisOpArray * oa)2269 void redisOpArrayFree(redisOpArray *oa) {
2270     while(oa->numops) {
2271         int j;
2272         redisOp *op;
2273 
2274         oa->numops--;
2275         op = oa->ops+oa->numops;
2276         for (j = 0; j < op->argc; j++)
2277             decrRefCount(op->argv[j]);
2278         zfree(op->argv);
2279     }
2280     zfree(oa->ops);
2281 }
2282 
2283 /* ====================== Commands lookup and execution ===================== */
2284 
lookupCommand(sds name)2285 struct redisCommand *lookupCommand(sds name) {
2286     return dictFetchValue(server.commands, name);
2287 }
2288 
lookupCommandByCString(char * s)2289 struct redisCommand *lookupCommandByCString(char *s) {
2290     struct redisCommand *cmd;
2291     sds name = sdsnew(s);
2292 
2293     cmd = dictFetchValue(server.commands, name);
2294     sdsfree(name);
2295     return cmd;
2296 }
2297 
2298 /* Lookup the command in the current table, if not found also check in
2299  * the original table containing the original command names unaffected by
2300  * redis.conf rename-command statement.
2301  *
2302  * This is used by functions rewriting the argument vector such as
2303  * rewriteClientCommandVector() in order to set client->cmd pointer
2304  * correctly even if the command was renamed. */
lookupCommandOrOriginal(sds name)2305 struct redisCommand *lookupCommandOrOriginal(sds name) {
2306     struct redisCommand *cmd = dictFetchValue(server.commands, name);
2307 
2308     if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2309     return cmd;
2310 }
2311 
2312 /* Propagate the specified command (in the context of the specified database id)
2313  * to AOF and Slaves.
2314  *
2315  * flags are an xor between:
2316  * + PROPAGATE_NONE (no propagation of command at all)
2317  * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2318  * + PROPAGATE_REPL (propagate into the replication link)
2319  *
2320  * This should not be used inside commands implementation. Use instead
2321  * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2322  */
propagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int flags)2323 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2324                int flags)
2325 {
2326     if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2327         feedAppendOnlyFile(cmd,dbid,argv,argc);
2328     if (flags & PROPAGATE_REPL)
2329         replicationFeedSlaves(server.slaves,dbid,argv,argc);
2330 }
2331 
2332 /* Used inside commands to schedule the propagation of additional commands
2333  * after the current command is propagated to AOF / Replication.
2334  *
2335  * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2336  * database ID the command should be propagated into.
2337  * Arguments of the command to propagte are passed as an array of redis
2338  * objects pointers of len 'argc', using the 'argv' vector.
2339  *
2340  * The function does not take a reference to the passed 'argv' vector,
2341  * so it is up to the caller to release the passed argv (but it is usually
2342  * stack allocated).  The function autoamtically increments ref count of
2343  * passed objects, so the caller does not need to. */
alsoPropagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2344 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2345                    int target)
2346 {
2347     robj **argvcopy;
2348     int j;
2349 
2350     if (server.loading) return; /* No propagation during loading. */
2351 
2352     argvcopy = zmalloc(sizeof(robj*)*argc);
2353     for (j = 0; j < argc; j++) {
2354         argvcopy[j] = argv[j];
2355         incrRefCount(argv[j]);
2356     }
2357     redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2358 }
2359 
2360 /* It is possible to call the function forceCommandPropagation() inside a
2361  * Redis command implementation in order to to force the propagation of a
2362  * specific command execution into AOF / Replication. */
forceCommandPropagation(client * c,int flags)2363 void forceCommandPropagation(client *c, int flags) {
2364     if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2365     if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2366 }
2367 
2368 /* Avoid that the executed command is propagated at all. This way we
2369  * are free to just propagate what we want using the alsoPropagate()
2370  * API. */
preventCommandPropagation(client * c)2371 void preventCommandPropagation(client *c) {
2372     c->flags |= CLIENT_PREVENT_PROP;
2373 }
2374 
2375 /* AOF specific version of preventCommandPropagation(). */
preventCommandAOF(client * c)2376 void preventCommandAOF(client *c) {
2377     c->flags |= CLIENT_PREVENT_AOF_PROP;
2378 }
2379 
2380 /* Replication specific version of preventCommandPropagation(). */
preventCommandReplication(client * c)2381 void preventCommandReplication(client *c) {
2382     c->flags |= CLIENT_PREVENT_REPL_PROP;
2383 }
2384 
2385 /* Call() is the core of Redis execution of a command.
2386  *
2387  * The following flags can be passed:
2388  * CMD_CALL_NONE        No flags.
2389  * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
2390  * CMD_CALL_STATS       Populate command stats.
2391  * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
2392  *                          or if the client flags are forcing propagation.
2393  * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
2394  *                          or if the client flags are forcing propagation.
2395  * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2396  * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
2397  *
2398  * The exact propagation behavior depends on the client flags.
2399  * Specifically:
2400  *
2401  * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2402  *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2403  *    in the call flags, then the command is propagated even if the
2404  *    dataset was not affected by the command.
2405  * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2406  *    are set, the propagation into AOF or to slaves is not performed even
2407  *    if the command modified the dataset.
2408  *
2409  * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2410  * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2411  * slaves propagation will never occur.
2412  *
2413  * Client flags are modified by the implementation of a given command
2414  * using the following API:
2415  *
2416  * forceCommandPropagation(client *c, int flags);
2417  * preventCommandPropagation(client *c);
2418  * preventCommandAOF(client *c);
2419  * preventCommandReplication(client *c);
2420  *
2421  */
call(client * c,int flags)2422 void call(client *c, int flags) {
2423     long long dirty, start, duration;
2424     int client_old_flags = c->flags;
2425     struct redisCommand *real_cmd = c->cmd;
2426 
2427     /* Sent the command to clients in MONITOR mode, only if the commands are
2428      * not generated from reading an AOF. */
2429     if (listLength(server.monitors) &&
2430         !server.loading &&
2431         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2432     {
2433         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2434     }
2435 
2436     /* Initialization: clear the flags that must be set by the command on
2437      * demand, and initialize the array for additional commands propagation. */
2438     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2439     redisOpArray prev_also_propagate = server.also_propagate;
2440     redisOpArrayInit(&server.also_propagate);
2441 
2442     /* Call the command. */
2443     dirty = server.dirty;
2444     start = ustime();
2445     c->cmd->proc(c);
2446     duration = ustime()-start;
2447     dirty = server.dirty-dirty;
2448     if (dirty < 0) dirty = 0;
2449 
2450     /* When EVAL is called loading the AOF we don't want commands called
2451      * from Lua to go into the slowlog or to populate statistics. */
2452     if (server.loading && c->flags & CLIENT_LUA)
2453         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2454 
2455     /* If the caller is Lua, we want to force the EVAL caller to propagate
2456      * the script if the command flag or client flag are forcing the
2457      * propagation. */
2458     if (c->flags & CLIENT_LUA && server.lua_caller) {
2459         if (c->flags & CLIENT_FORCE_REPL)
2460             server.lua_caller->flags |= CLIENT_FORCE_REPL;
2461         if (c->flags & CLIENT_FORCE_AOF)
2462             server.lua_caller->flags |= CLIENT_FORCE_AOF;
2463     }
2464 
2465     /* Log the command into the Slow log if needed, and populate the
2466      * per-command statistics that we show in INFO commandstats. */
2467     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2468         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2469                               "fast-command" : "command";
2470         latencyAddSampleIfNeeded(latency_event,duration/1000);
2471         slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
2472     }
2473     if (flags & CMD_CALL_STATS) {
2474         /* use the real command that was executed (cmd and lastamc) may be
2475          * different, in case of MULTI-EXEC or re-written commands such as
2476          * EXPIRE, GEOADD, etc. */
2477         real_cmd->microseconds += duration;
2478         real_cmd->calls++;
2479     }
2480 
2481     /* Propagate the command into the AOF and replication link */
2482     if (flags & CMD_CALL_PROPAGATE &&
2483         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2484     {
2485         int propagate_flags = PROPAGATE_NONE;
2486 
2487         /* Check if the command operated changes in the data set. If so
2488          * set for replication / AOF propagation. */
2489         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2490 
2491         /* If the client forced AOF / replication of the command, set
2492          * the flags regardless of the command effects on the data set. */
2493         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2494         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2495 
2496         /* However prevent AOF / replication propagation if the command
2497          * implementations called preventCommandPropagation() or similar,
2498          * or if we don't have the call() flags to do so. */
2499         if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2500             !(flags & CMD_CALL_PROPAGATE_REPL))
2501                 propagate_flags &= ~PROPAGATE_REPL;
2502         if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2503             !(flags & CMD_CALL_PROPAGATE_AOF))
2504                 propagate_flags &= ~PROPAGATE_AOF;
2505 
2506         /* Call propagate() only if at least one of AOF / replication
2507          * propagation is needed. Note that modules commands handle replication
2508          * in an explicit way, so we never replicate them automatically. */
2509         if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
2510             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2511     }
2512 
2513     /* Restore the old replication flags, since call() can be executed
2514      * recursively. */
2515     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2516     c->flags |= client_old_flags &
2517         (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2518 
2519     /* Handle the alsoPropagate() API to handle commands that want to propagate
2520      * multiple separated commands. Note that alsoPropagate() is not affected
2521      * by CLIENT_PREVENT_PROP flag. */
2522     if (server.also_propagate.numops) {
2523         int j;
2524         redisOp *rop;
2525 
2526         if (flags & CMD_CALL_PROPAGATE) {
2527             for (j = 0; j < server.also_propagate.numops; j++) {
2528                 rop = &server.also_propagate.ops[j];
2529                 int target = rop->target;
2530                 /* Whatever the command wish is, we honor the call() flags. */
2531                 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2532                 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2533                 if (target)
2534                     propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2535             }
2536         }
2537         redisOpArrayFree(&server.also_propagate);
2538     }
2539     server.also_propagate = prev_also_propagate;
2540     server.stat_numcommands++;
2541 }
2542 
2543 /* If this function gets called we already read a whole
2544  * command, arguments are in the client argv/argc fields.
2545  * processCommand() execute the command or prepare the
2546  * server for a bulk read from the client.
2547  *
2548  * If C_OK is returned the client is still alive and valid and
2549  * other operations can be performed by the caller. Otherwise
2550  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
processCommand(client * c)2551 int processCommand(client *c) {
2552     moduleCallCommandFilters(c);
2553 
2554     /* The QUIT command is handled separately. Normal command procs will
2555      * go through checking for replication and QUIT will cause trouble
2556      * when FORCE_REPLICATION is enabled and would be implemented in
2557      * a regular command proc. */
2558     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2559         addReply(c,shared.ok);
2560         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2561         return C_ERR;
2562     }
2563 
2564     /* Now lookup the command and check ASAP about trivial error conditions
2565      * such as wrong arity, bad command name and so forth. */
2566     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2567     if (!c->cmd) {
2568         flagTransaction(c);
2569         sds args = sdsempty();
2570         int i;
2571         for (i=1; i < c->argc && sdslen(args) < 128; i++)
2572             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
2573         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
2574             (char*)c->argv[0]->ptr, args);
2575         sdsfree(args);
2576         return C_OK;
2577     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2578                (c->argc < -c->cmd->arity)) {
2579         flagTransaction(c);
2580         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2581             c->cmd->name);
2582         return C_OK;
2583     }
2584 
2585     /* Check if the user is authenticated */
2586     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2587     {
2588         flagTransaction(c);
2589         addReply(c,shared.noautherr);
2590         return C_OK;
2591     }
2592 
2593     /* If cluster is enabled perform the cluster redirection here.
2594      * However we don't perform the redirection if:
2595      * 1) The sender of this command is our master.
2596      * 2) The command has no key arguments. */
2597     if (server.cluster_enabled &&
2598         !(c->flags & CLIENT_MASTER) &&
2599         !(c->flags & CLIENT_LUA &&
2600           server.lua_caller->flags & CLIENT_MASTER) &&
2601         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2602           c->cmd->proc != execCommand))
2603     {
2604         int hashslot;
2605         int error_code;
2606         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2607                                         &hashslot,&error_code);
2608         if (n == NULL || n != server.cluster->myself) {
2609             if (c->cmd->proc == execCommand) {
2610                 discardTransaction(c);
2611             } else {
2612                 flagTransaction(c);
2613             }
2614             clusterRedirectClient(c,n,hashslot,error_code);
2615             return C_OK;
2616         }
2617     }
2618 
2619     /* Handle the maxmemory directive.
2620      *
2621      * Note that we do not want to reclaim memory if we are here re-entering
2622      * the event loop since there is a busy Lua script running in timeout
2623      * condition, to avoid mixing the propagation of scripts with the
2624      * propagation of DELs due to eviction. */
2625     if (server.maxmemory && !server.lua_timedout) {
2626         int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
2627         /* freeMemoryIfNeeded may flush slave output buffers. This may result
2628          * into a slave, that may be the active client, to be freed. */
2629         if (server.current_client == NULL) return C_ERR;
2630 
2631         /* It was impossible to free enough memory, and the command the client
2632          * is trying to execute is denied during OOM conditions or the client
2633          * is in MULTI/EXEC context? Error. */
2634         if (out_of_memory &&
2635             (c->cmd->flags & CMD_DENYOOM ||
2636              (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
2637             flagTransaction(c);
2638             addReply(c, shared.oomerr);
2639             return C_OK;
2640         }
2641     }
2642 
2643     /* Don't accept write commands if there are problems persisting on disk
2644      * and if this is a master instance. */
2645     int deny_write_type = writeCommandsDeniedByDiskError();
2646     if (deny_write_type != DISK_ERROR_TYPE_NONE &&
2647         server.masterhost == NULL &&
2648         (c->cmd->flags & CMD_WRITE ||
2649          c->cmd->proc == pingCommand))
2650     {
2651         flagTransaction(c);
2652         if (deny_write_type == DISK_ERROR_TYPE_RDB)
2653             addReply(c, shared.bgsaveerr);
2654         else
2655             addReplySds(c,
2656                 sdscatprintf(sdsempty(),
2657                 "-MISCONF Errors writing to the AOF file: %s\r\n",
2658                 strerror(server.aof_last_write_errno)));
2659         return C_OK;
2660     }
2661 
2662     /* Don't accept write commands if there are not enough good slaves and
2663      * user configured the min-slaves-to-write option. */
2664     if (server.masterhost == NULL &&
2665         server.repl_min_slaves_to_write &&
2666         server.repl_min_slaves_max_lag &&
2667         c->cmd->flags & CMD_WRITE &&
2668         server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2669     {
2670         flagTransaction(c);
2671         addReply(c, shared.noreplicaserr);
2672         return C_OK;
2673     }
2674 
2675     /* Don't accept write commands if this is a read only slave. But
2676      * accept write commands if this is our master. */
2677     if (server.masterhost && server.repl_slave_ro &&
2678         !(c->flags & CLIENT_MASTER) &&
2679         c->cmd->flags & CMD_WRITE)
2680     {
2681         addReply(c, shared.roslaveerr);
2682         return C_OK;
2683     }
2684 
2685     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2686     if (c->flags & CLIENT_PUBSUB &&
2687         c->cmd->proc != pingCommand &&
2688         c->cmd->proc != subscribeCommand &&
2689         c->cmd->proc != unsubscribeCommand &&
2690         c->cmd->proc != psubscribeCommand &&
2691         c->cmd->proc != punsubscribeCommand) {
2692         addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2693         return C_OK;
2694     }
2695 
2696     /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
2697      * when slave-serve-stale-data is no and we are a slave with a broken
2698      * link with master. */
2699     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2700         server.repl_serve_stale_data == 0 &&
2701         !(c->cmd->flags & CMD_STALE))
2702     {
2703         flagTransaction(c);
2704         addReply(c, shared.masterdownerr);
2705         return C_OK;
2706     }
2707 
2708     /* Loading DB? Return an error if the command has not the
2709      * CMD_LOADING flag. */
2710     if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2711         addReply(c, shared.loadingerr);
2712         return C_OK;
2713     }
2714 
2715     /* Lua script too slow? Only allow a limited number of commands. */
2716     if (server.lua_timedout &&
2717           c->cmd->proc != authCommand &&
2718           c->cmd->proc != replconfCommand &&
2719         !(c->cmd->proc == shutdownCommand &&
2720           c->argc == 2 &&
2721           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2722         !(c->cmd->proc == scriptCommand &&
2723           c->argc == 2 &&
2724           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2725     {
2726         flagTransaction(c);
2727         addReply(c, shared.slowscripterr);
2728         return C_OK;
2729     }
2730 
2731     /* Exec the command */
2732     if (c->flags & CLIENT_MULTI &&
2733         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2734         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2735     {
2736         queueMultiCommand(c);
2737         addReply(c,shared.queued);
2738     } else {
2739         call(c,CMD_CALL_FULL);
2740         c->woff = server.master_repl_offset;
2741         if (listLength(server.ready_keys))
2742             handleClientsBlockedOnKeys();
2743     }
2744     return C_OK;
2745 }
2746 
2747 /*================================== Shutdown =============================== */
2748 
2749 /* Close listening sockets. Also unlink the unix domain socket if
2750  * unlink_unix_socket is non-zero. */
closeListeningSockets(int unlink_unix_socket)2751 void closeListeningSockets(int unlink_unix_socket) {
2752     int j;
2753 
2754     for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2755     if (server.sofd != -1) close(server.sofd);
2756     if (server.cluster_enabled)
2757         for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2758     if (unlink_unix_socket && server.unixsocket) {
2759         serverLog(LL_NOTICE,"Removing the unix socket file.");
2760         unlink(server.unixsocket); /* don't care if this fails */
2761     }
2762 }
2763 
2764 /* Reset cpu affinity as soon as new process fork().
2765   * For new process will use same cpu core with redis server. */
resetCpuAffinity(const char * name)2766 void resetCpuAffinity(const char* name)
2767 {
2768     int j = 0, s = 0;
2769     cpu_set_t cpuset_frm, cpuset_to;
2770     pthread_t thread;
2771 #ifdef HAVE_FF_KQUEUE
2772     thread = pthread_self();
2773     CPU_ZERO(&cpuset_frm);
2774     CPU_ZERO(&cpuset_to);
2775 
2776     pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset_frm);
2777     for (j = 0; j < CPU_SETSIZE; j++)
2778     {
2779         if ( CPU_ISSET(j, &cpuset_frm) )
2780             continue;
2781         CPU_SET(j, &cpuset_to);
2782     }
2783     s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset_to);
2784     if (s != 0)
2785         serverLog(LL_WARNING,"set cpu affinity, failed.");
2786     if (name!=NULL)
2787     {
2788     	pthread_setname_np(thread, name);
2789     }
2790 #endif
2791     return;
2792 }
2793 
prepareForShutdown(int flags)2794 int prepareForShutdown(int flags) {
2795     int save = flags & SHUTDOWN_SAVE;
2796     int nosave = flags & SHUTDOWN_NOSAVE;
2797 
2798     serverLog(LL_WARNING,"User requested shutdown...");
2799 
2800     /* Kill all the Lua debugger forked sessions. */
2801     ldbKillForkedSessions();
2802 
2803     /* Kill the saving child if there is a background saving in progress.
2804        We want to avoid race conditions, for instance our saving child may
2805        overwrite the synchronous saving did by SHUTDOWN. */
2806     if (server.rdb_child_pid != -1) {
2807         serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2808         kill(server.rdb_child_pid,SIGUSR1);
2809         rdbRemoveTempFile(server.rdb_child_pid);
2810     }
2811 
2812     if (server.aof_state != AOF_OFF) {
2813         /* Kill the AOF saving child as the AOF we already have may be longer
2814          * but contains the full dataset anyway. */
2815         if (server.aof_child_pid != -1) {
2816             /* If we have AOF enabled but haven't written the AOF yet, don't
2817              * shutdown or else the dataset will be lost. */
2818             if (server.aof_state == AOF_WAIT_REWRITE) {
2819                 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2820                 return C_ERR;
2821             }
2822             serverLog(LL_WARNING,
2823                 "There is a child rewriting the AOF. Killing it!");
2824             kill(server.aof_child_pid,SIGUSR1);
2825         }
2826         /* Append only file: flush buffers and fsync() the AOF at exit */
2827         serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2828         flushAppendOnlyFile(1);
2829         redis_fsync(server.aof_fd);
2830     }
2831 
2832     /* Create a new RDB file before exiting. */
2833     if ((server.saveparamslen > 0 && !nosave) || save) {
2834         serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2835         /* Snapshotting. Perform a SYNC SAVE and exit */
2836         rdbSaveInfo rsi, *rsiptr;
2837         rsiptr = rdbPopulateSaveInfo(&rsi);
2838         if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
2839             /* Ooops.. error saving! The best we can do is to continue
2840              * operating. Note that if there was a background saving process,
2841              * in the next cron() Redis will be notified that the background
2842              * saving aborted, handling special stuff like slaves pending for
2843              * synchronization... */
2844             serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2845             return C_ERR;
2846         }
2847     }
2848 
2849     /* Remove the pid file if possible and needed. */
2850     if (server.daemonize || server.pidfile) {
2851         serverLog(LL_NOTICE,"Removing the pid file.");
2852         unlink(server.pidfile);
2853     }
2854 
2855     /* Best effort flush of slave output buffers, so that we hopefully
2856      * send them pending writes. */
2857     flushSlavesOutputBuffers();
2858 
2859     /* Close the listening sockets. Apparently this allows faster restarts. */
2860     closeListeningSockets(1);
2861     serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2862         server.sentinel_mode ? "Sentinel" : "Redis");
2863     return C_OK;
2864 }
2865 
2866 /*================================== Commands =============================== */
2867 
2868 /* Sometimes Redis cannot accept write commands because there is a perstence
2869  * error with the RDB or AOF file, and Redis is configured in order to stop
2870  * accepting writes in such situation. This function returns if such a
2871  * condition is active, and the type of the condition.
2872  *
2873  * Function return values:
2874  *
2875  * DISK_ERROR_TYPE_NONE:    No problems, we can accept writes.
2876  * DISK_ERROR_TYPE_AOF:     Don't accept writes: AOF errors.
2877  * DISK_ERROR_TYPE_RDB:     Don't accept writes: RDB errors.
2878  */
writeCommandsDeniedByDiskError(void)2879 int writeCommandsDeniedByDiskError(void) {
2880     if (server.stop_writes_on_bgsave_err &&
2881         server.saveparamslen > 0 &&
2882         server.lastbgsave_status == C_ERR)
2883     {
2884         return DISK_ERROR_TYPE_RDB;
2885     } else if (server.aof_state != AOF_OFF &&
2886                server.aof_last_write_status == C_ERR)
2887     {
2888         return DISK_ERROR_TYPE_AOF;
2889     } else {
2890         return DISK_ERROR_TYPE_NONE;
2891     }
2892 }
2893 
2894 /* Return zero if strings are the same, non-zero if they are not.
2895  * The comparison is performed in a way that prevents an attacker to obtain
2896  * information about the nature of the strings just monitoring the execution
2897  * time of the function.
2898  *
2899  * Note that limiting the comparison length to strings up to 512 bytes we
2900  * can avoid leaking any information about the password length and any
2901  * possible branch misprediction related leak.
2902  */
time_independent_strcmp(char * a,char * b)2903 int time_independent_strcmp(char *a, char *b) {
2904     char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2905     /* The above two strlen perform len(a) + len(b) operations where either
2906      * a or b are fixed (our password) length, and the difference is only
2907      * relative to the length of the user provided string, so no information
2908      * leak is possible in the following two lines of code. */
2909     unsigned int alen = strlen(a);
2910     unsigned int blen = strlen(b);
2911     unsigned int j;
2912     int diff = 0;
2913 
2914     /* We can't compare strings longer than our static buffers.
2915      * Note that this will never pass the first test in practical circumstances
2916      * so there is no info leak. */
2917     if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2918 
2919     memset(bufa,0,sizeof(bufa));        /* Constant time. */
2920     memset(bufb,0,sizeof(bufb));        /* Constant time. */
2921     /* Again the time of the following two copies is proportional to
2922      * len(a) + len(b) so no info is leaked. */
2923     memcpy(bufa,a,alen);
2924     memcpy(bufb,b,blen);
2925 
2926     /* Always compare all the chars in the two buffers without
2927      * conditional expressions. */
2928     for (j = 0; j < sizeof(bufa); j++) {
2929         diff |= (bufa[j] ^ bufb[j]);
2930     }
2931     /* Length must be equal as well. */
2932     diff |= alen ^ blen;
2933     return diff; /* If zero strings are the same. */
2934 }
2935 
authCommand(client * c)2936 void authCommand(client *c) {
2937     if (!server.requirepass) {
2938         addReplyError(c,"Client sent AUTH, but no password is set");
2939     } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2940       c->authenticated = 1;
2941       addReply(c,shared.ok);
2942     } else {
2943       c->authenticated = 0;
2944       addReplyError(c,"invalid password");
2945     }
2946 }
2947 
2948 /* The PING command. It works in a different way if the client is in
2949  * in Pub/Sub mode. */
pingCommand(client * c)2950 void pingCommand(client *c) {
2951     /* The command takes zero or one arguments. */
2952     if (c->argc > 2) {
2953         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2954             c->cmd->name);
2955         return;
2956     }
2957 
2958     if (c->flags & CLIENT_PUBSUB) {
2959         addReply(c,shared.mbulkhdr[2]);
2960         addReplyBulkCBuffer(c,"pong",4);
2961         if (c->argc == 1)
2962             addReplyBulkCBuffer(c,"",0);
2963         else
2964             addReplyBulk(c,c->argv[1]);
2965     } else {
2966         if (c->argc == 1)
2967             addReply(c,shared.pong);
2968         else
2969             addReplyBulk(c,c->argv[1]);
2970     }
2971 }
2972 
echoCommand(client * c)2973 void echoCommand(client *c) {
2974     addReplyBulk(c,c->argv[1]);
2975 }
2976 
timeCommand(client * c)2977 void timeCommand(client *c) {
2978     struct timeval tv;
2979 
2980     /* gettimeofday() can only fail if &tv is a bad address so we
2981      * don't check for errors. */
2982     gettimeofday(&tv,NULL);
2983     addReplyMultiBulkLen(c,2);
2984     addReplyBulkLongLong(c,tv.tv_sec);
2985     addReplyBulkLongLong(c,tv.tv_usec);
2986 }
2987 
2988 /* Helper function for addReplyCommand() to output flags. */
addReplyCommandFlag(client * c,struct redisCommand * cmd,int f,char * reply)2989 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2990     if (cmd->flags & f) {
2991         addReplyStatus(c, reply);
2992         return 1;
2993     }
2994     return 0;
2995 }
2996 
2997 /* Output the representation of a Redis command. Used by the COMMAND command. */
addReplyCommand(client * c,struct redisCommand * cmd)2998 void addReplyCommand(client *c, struct redisCommand *cmd) {
2999     if (!cmd) {
3000         addReply(c, shared.nullbulk);
3001     } else {
3002         /* We are adding: command name, arg count, flags, first, last, offset */
3003         addReplyMultiBulkLen(c, 6);
3004         addReplyBulkCString(c, cmd->name);
3005         addReplyLongLong(c, cmd->arity);
3006 
3007         int flagcount = 0;
3008         void *flaglen = addDeferredMultiBulkLength(c);
3009         flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
3010         flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
3011         flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
3012         flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
3013         flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
3014         flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
3015         flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
3016         flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
3017         flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
3018         flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
3019         flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
3020         flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
3021         flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
3022         if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
3023             cmd->flags & CMD_MODULE_GETKEYS)
3024         {
3025             addReplyStatus(c, "movablekeys");
3026             flagcount += 1;
3027         }
3028         setDeferredMultiBulkLength(c, flaglen, flagcount);
3029 
3030         addReplyLongLong(c, cmd->firstkey);
3031         addReplyLongLong(c, cmd->lastkey);
3032         addReplyLongLong(c, cmd->keystep);
3033     }
3034 }
3035 
3036 /* COMMAND <subcommand> <args> */
commandCommand(client * c)3037 void commandCommand(client *c) {
3038     dictIterator *di;
3039     dictEntry *de;
3040 
3041     if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
3042         const char *help[] = {
3043 "(no subcommand) -- Return details about all Redis commands.",
3044 "COUNT -- Return the total number of commands in this Redis server.",
3045 "GETKEYS <full-command> -- Return the keys from a full Redis command.",
3046 "INFO [command-name ...] -- Return details about multiple Redis commands.",
3047 NULL
3048         };
3049         addReplyHelp(c, help);
3050     } else if (c->argc == 1) {
3051         addReplyMultiBulkLen(c, dictSize(server.commands));
3052         di = dictGetIterator(server.commands);
3053         while ((de = dictNext(di)) != NULL) {
3054             addReplyCommand(c, dictGetVal(de));
3055         }
3056         dictReleaseIterator(di);
3057     } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
3058         int i;
3059         addReplyMultiBulkLen(c, c->argc-2);
3060         for (i = 2; i < c->argc; i++) {
3061             addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
3062         }
3063     } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
3064         addReplyLongLong(c, dictSize(server.commands));
3065     } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
3066         struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
3067         int *keys, numkeys, j;
3068 
3069         if (!cmd) {
3070             addReplyError(c,"Invalid command specified");
3071             return;
3072         } else if (cmd->getkeys_proc == NULL && cmd->firstkey == 0) {
3073             addReplyError(c,"The command has no key arguments");
3074             return;
3075         } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
3076                    ((c->argc-2) < -cmd->arity))
3077         {
3078             addReplyError(c,"Invalid number of arguments specified for command");
3079             return;
3080         }
3081 
3082         keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
3083         if (!keys) {
3084             addReplyError(c,"Invalid arguments specified for command");
3085         } else {
3086             addReplyMultiBulkLen(c,numkeys);
3087             for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
3088             getKeysFreeResult(keys);
3089         }
3090     } else {
3091         addReplySubcommandSyntaxError(c);
3092     }
3093 }
3094 
3095 /* Convert an amount of bytes into a human readable string in the form
3096  * of 100B, 2G, 100M, 4K, and so forth. */
bytesToHuman(char * s,unsigned long long n)3097 void bytesToHuman(char *s, unsigned long long n) {
3098     double d;
3099 
3100     if (n < 1024) {
3101         /* Bytes */
3102         sprintf(s,"%lluB",n);
3103     } else if (n < (1024*1024)) {
3104         d = (double)n/(1024);
3105         sprintf(s,"%.2fK",d);
3106     } else if (n < (1024LL*1024*1024)) {
3107         d = (double)n/(1024*1024);
3108         sprintf(s,"%.2fM",d);
3109     } else if (n < (1024LL*1024*1024*1024)) {
3110         d = (double)n/(1024LL*1024*1024);
3111         sprintf(s,"%.2fG",d);
3112     } else if (n < (1024LL*1024*1024*1024*1024)) {
3113         d = (double)n/(1024LL*1024*1024*1024);
3114         sprintf(s,"%.2fT",d);
3115     } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
3116         d = (double)n/(1024LL*1024*1024*1024*1024);
3117         sprintf(s,"%.2fP",d);
3118     } else {
3119         /* Let's hope we never need this */
3120         sprintf(s,"%lluB",n);
3121     }
3122 }
3123 
3124 /* Create the string returned by the INFO command. This is decoupled
3125  * by the INFO command itself as we need to report the same information
3126  * on memory corruption problems. */
genRedisInfoString(char * section)3127 sds genRedisInfoString(char *section) {
3128     sds info = sdsempty();
3129     time_t uptime = server.unixtime-server.stat_starttime;
3130     int j;
3131     struct rusage self_ru, c_ru;
3132     int allsections = 0, defsections = 0;
3133     int sections = 0;
3134 
3135     if (section == NULL) section = "default";
3136     allsections = strcasecmp(section,"all") == 0;
3137     defsections = strcasecmp(section,"default") == 0;
3138 
3139     getrusage(RUSAGE_SELF, &self_ru);
3140     getrusage(RUSAGE_CHILDREN, &c_ru);
3141 
3142     /* Server */
3143     if (allsections || defsections || !strcasecmp(section,"server")) {
3144         static int call_uname = 1;
3145         static struct utsname name;
3146         char *mode;
3147 
3148         if (server.cluster_enabled) mode = "cluster";
3149         else if (server.sentinel_mode) mode = "sentinel";
3150         else mode = "standalone";
3151 
3152         if (sections++) info = sdscat(info,"\r\n");
3153 
3154         if (call_uname) {
3155             /* Uname can be slow and is always the same output. Cache it. */
3156             uname(&name);
3157             call_uname = 0;
3158         }
3159 
3160         unsigned int lruclock;
3161         atomicGet(server.lruclock,lruclock);
3162         info = sdscatprintf(info,
3163             "# Server\r\n"
3164             "redis_version:%s\r\n"
3165             "redis_git_sha1:%s\r\n"
3166             "redis_git_dirty:%d\r\n"
3167             "redis_build_id:%llx\r\n"
3168             "redis_mode:%s\r\n"
3169             "os:%s %s %s\r\n"
3170             "arch_bits:%d\r\n"
3171             "multiplexing_api:%s\r\n"
3172             "atomicvar_api:%s\r\n"
3173             "gcc_version:%d.%d.%d\r\n"
3174             "process_id:%ld\r\n"
3175             "run_id:%s\r\n"
3176             "tcp_port:%d\r\n"
3177             "uptime_in_seconds:%jd\r\n"
3178             "uptime_in_days:%jd\r\n"
3179             "hz:%d\r\n"
3180             "configured_hz:%d\r\n"
3181             "lru_clock:%ld\r\n"
3182             "executable:%s\r\n"
3183             "config_file:%s\r\n",
3184             REDIS_VERSION,
3185             redisGitSHA1(),
3186             strtol(redisGitDirty(),NULL,10) > 0,
3187             (unsigned long long) redisBuildId(),
3188             mode,
3189             name.sysname, name.release, name.machine,
3190             server.arch_bits,
3191             aeGetApiName(),
3192             REDIS_ATOMIC_API,
3193 #ifdef __GNUC__
3194             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
3195 #else
3196             0,0,0,
3197 #endif
3198             (long) getpid(),
3199             server.runid,
3200             server.port,
3201             (intmax_t)uptime,
3202             (intmax_t)(uptime/(3600*24)),
3203             server.hz,
3204             server.config_hz,
3205             (unsigned long) lruclock,
3206             server.executable ? server.executable : "",
3207             server.configfile ? server.configfile : "");
3208     }
3209 
3210     /* Clients */
3211     if (allsections || defsections || !strcasecmp(section,"clients")) {
3212         size_t maxin, maxout;
3213         getExpansiveClientsInfo(&maxin,&maxout);
3214         if (sections++) info = sdscat(info,"\r\n");
3215         info = sdscatprintf(info,
3216             "# Clients\r\n"
3217             "connected_clients:%lu\r\n"
3218             "client_recent_max_input_buffer:%zu\r\n"
3219             "client_recent_max_output_buffer:%zu\r\n"
3220             "blocked_clients:%d\r\n",
3221             listLength(server.clients)-listLength(server.slaves),
3222             maxin, maxout,
3223             server.blocked_clients);
3224     }
3225 
3226     /* Memory */
3227     if (allsections || defsections || !strcasecmp(section,"memory")) {
3228         char hmem[64];
3229         char peak_hmem[64];
3230         char total_system_hmem[64];
3231         char used_memory_lua_hmem[64];
3232         char used_memory_scripts_hmem[64];
3233         char used_memory_rss_hmem[64];
3234         char maxmemory_hmem[64];
3235         size_t zmalloc_used = zmalloc_used_memory();
3236         size_t total_system_mem = server.system_memory_size;
3237         const char *evict_policy = evictPolicyToString();
3238         long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
3239         struct redisMemOverhead *mh = getMemoryOverheadData();
3240 
3241         /* Peak memory is updated from time to time by serverCron() so it
3242          * may happen that the instantaneous value is slightly bigger than
3243          * the peak value. This may confuse users, so we update the peak
3244          * if found smaller than the current memory usage. */
3245         if (zmalloc_used > server.stat_peak_memory)
3246             server.stat_peak_memory = zmalloc_used;
3247 
3248         bytesToHuman(hmem,zmalloc_used);
3249         bytesToHuman(peak_hmem,server.stat_peak_memory);
3250         bytesToHuman(total_system_hmem,total_system_mem);
3251         bytesToHuman(used_memory_lua_hmem,memory_lua);
3252         bytesToHuman(used_memory_scripts_hmem,mh->lua_caches);
3253         bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss);
3254         bytesToHuman(maxmemory_hmem,server.maxmemory);
3255 
3256         if (sections++) info = sdscat(info,"\r\n");
3257         info = sdscatprintf(info,
3258             "# Memory\r\n"
3259             "used_memory:%zu\r\n"
3260             "used_memory_human:%s\r\n"
3261             "used_memory_rss:%zu\r\n"
3262             "used_memory_rss_human:%s\r\n"
3263             "used_memory_peak:%zu\r\n"
3264             "used_memory_peak_human:%s\r\n"
3265             "used_memory_peak_perc:%.2f%%\r\n"
3266             "used_memory_overhead:%zu\r\n"
3267             "used_memory_startup:%zu\r\n"
3268             "used_memory_dataset:%zu\r\n"
3269             "used_memory_dataset_perc:%.2f%%\r\n"
3270             "allocator_allocated:%zu\r\n"
3271             "allocator_active:%zu\r\n"
3272             "allocator_resident:%zu\r\n"
3273             "total_system_memory:%lu\r\n"
3274             "total_system_memory_human:%s\r\n"
3275             "used_memory_lua:%lld\r\n"
3276             "used_memory_lua_human:%s\r\n"
3277             "used_memory_scripts:%lld\r\n"
3278             "used_memory_scripts_human:%s\r\n"
3279             "number_of_cached_scripts:%lu\r\n"
3280             "maxmemory:%lld\r\n"
3281             "maxmemory_human:%s\r\n"
3282             "maxmemory_policy:%s\r\n"
3283             "allocator_frag_ratio:%.2f\r\n"
3284             "allocator_frag_bytes:%zu\r\n"
3285             "allocator_rss_ratio:%.2f\r\n"
3286             "allocator_rss_bytes:%zd\r\n"
3287             "rss_overhead_ratio:%.2f\r\n"
3288             "rss_overhead_bytes:%zd\r\n"
3289             "mem_fragmentation_ratio:%.2f\r\n"
3290             "mem_fragmentation_bytes:%zd\r\n"
3291             "mem_not_counted_for_evict:%zu\r\n"
3292             "mem_replication_backlog:%zu\r\n"
3293             "mem_clients_slaves:%zu\r\n"
3294             "mem_clients_normal:%zu\r\n"
3295             "mem_aof_buffer:%zu\r\n"
3296             "mem_allocator:%s\r\n"
3297             "active_defrag_running:%d\r\n"
3298             "lazyfree_pending_objects:%zu\r\n",
3299             zmalloc_used,
3300             hmem,
3301             server.cron_malloc_stats.process_rss,
3302             used_memory_rss_hmem,
3303             server.stat_peak_memory,
3304             peak_hmem,
3305             mh->peak_perc,
3306             mh->overhead_total,
3307             mh->startup_allocated,
3308             mh->dataset,
3309             mh->dataset_perc,
3310             server.cron_malloc_stats.allocator_allocated,
3311             server.cron_malloc_stats.allocator_active,
3312             server.cron_malloc_stats.allocator_resident,
3313             (unsigned long)total_system_mem,
3314             total_system_hmem,
3315             memory_lua,
3316             used_memory_lua_hmem,
3317             (long long) mh->lua_caches,
3318             used_memory_scripts_hmem,
3319             dictSize(server.lua_scripts),
3320             server.maxmemory,
3321             maxmemory_hmem,
3322             evict_policy,
3323             mh->allocator_frag,
3324             mh->allocator_frag_bytes,
3325             mh->allocator_rss,
3326             mh->allocator_rss_bytes,
3327             mh->rss_extra,
3328             mh->rss_extra_bytes,
3329             mh->total_frag, /* this is the total RSS overhead, including fragmentation, */
3330             mh->total_frag_bytes, /* named so for backwards compatibility */
3331             freeMemoryGetNotCountedMemory(),
3332             mh->repl_backlog,
3333             mh->clients_slaves,
3334             mh->clients_normal,
3335             mh->aof_buffer,
3336             ZMALLOC_LIB,
3337             server.active_defrag_running,
3338             lazyfreeGetPendingObjectsCount()
3339         );
3340         freeMemoryOverheadData(mh);
3341     }
3342 
3343     /* Persistence */
3344     if (allsections || defsections || !strcasecmp(section,"persistence")) {
3345         if (sections++) info = sdscat(info,"\r\n");
3346         info = sdscatprintf(info,
3347             "# Persistence\r\n"
3348             "loading:%d\r\n"
3349             "rdb_changes_since_last_save:%lld\r\n"
3350             "rdb_bgsave_in_progress:%d\r\n"
3351             "rdb_last_save_time:%jd\r\n"
3352             "rdb_last_bgsave_status:%s\r\n"
3353             "rdb_last_bgsave_time_sec:%jd\r\n"
3354             "rdb_current_bgsave_time_sec:%jd\r\n"
3355             "rdb_last_cow_size:%zu\r\n"
3356             "aof_enabled:%d\r\n"
3357             "aof_rewrite_in_progress:%d\r\n"
3358             "aof_rewrite_scheduled:%d\r\n"
3359             "aof_last_rewrite_time_sec:%jd\r\n"
3360             "aof_current_rewrite_time_sec:%jd\r\n"
3361             "aof_last_bgrewrite_status:%s\r\n"
3362             "aof_last_write_status:%s\r\n"
3363             "aof_last_cow_size:%zu\r\n",
3364             server.loading,
3365             server.dirty,
3366             server.rdb_child_pid != -1,
3367             (intmax_t)server.lastsave,
3368             (server.lastbgsave_status == C_OK) ? "ok" : "err",
3369             (intmax_t)server.rdb_save_time_last,
3370             (intmax_t)((server.rdb_child_pid == -1) ?
3371                 -1 : time(NULL)-server.rdb_save_time_start),
3372             server.stat_rdb_cow_bytes,
3373             server.aof_state != AOF_OFF,
3374             server.aof_child_pid != -1,
3375             server.aof_rewrite_scheduled,
3376             (intmax_t)server.aof_rewrite_time_last,
3377             (intmax_t)((server.aof_child_pid == -1) ?
3378                 -1 : time(NULL)-server.aof_rewrite_time_start),
3379             (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
3380             (server.aof_last_write_status == C_OK) ? "ok" : "err",
3381             server.stat_aof_cow_bytes);
3382 
3383         if (server.aof_state != AOF_OFF) {
3384             info = sdscatprintf(info,
3385                 "aof_current_size:%lld\r\n"
3386                 "aof_base_size:%lld\r\n"
3387                 "aof_pending_rewrite:%d\r\n"
3388                 "aof_buffer_length:%zu\r\n"
3389                 "aof_rewrite_buffer_length:%lu\r\n"
3390                 "aof_pending_bio_fsync:%llu\r\n"
3391                 "aof_delayed_fsync:%lu\r\n",
3392                 (long long) server.aof_current_size,
3393                 (long long) server.aof_rewrite_base_size,
3394                 server.aof_rewrite_scheduled,
3395                 sdslen(server.aof_buf),
3396                 aofRewriteBufferSize(),
3397                 bioPendingJobsOfType(BIO_AOF_FSYNC),
3398                 server.aof_delayed_fsync);
3399         }
3400 
3401         if (server.loading) {
3402             double perc;
3403             time_t eta, elapsed;
3404             off_t remaining_bytes = server.loading_total_bytes-
3405                                     server.loading_loaded_bytes;
3406 
3407             perc = ((double)server.loading_loaded_bytes /
3408                    (server.loading_total_bytes+1)) * 100;
3409 
3410             elapsed = time(NULL)-server.loading_start_time;
3411             if (elapsed == 0) {
3412                 eta = 1; /* A fake 1 second figure if we don't have
3413                             enough info */
3414             } else {
3415                 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3416             }
3417 
3418             info = sdscatprintf(info,
3419                 "loading_start_time:%jd\r\n"
3420                 "loading_total_bytes:%llu\r\n"
3421                 "loading_loaded_bytes:%llu\r\n"
3422                 "loading_loaded_perc:%.2f\r\n"
3423                 "loading_eta_seconds:%jd\r\n",
3424                 (intmax_t) server.loading_start_time,
3425                 (unsigned long long) server.loading_total_bytes,
3426                 (unsigned long long) server.loading_loaded_bytes,
3427                 perc,
3428                 (intmax_t)eta
3429             );
3430         }
3431     }
3432 
3433     /* Stats */
3434     if (allsections || defsections || !strcasecmp(section,"stats")) {
3435         if (sections++) info = sdscat(info,"\r\n");
3436         info = sdscatprintf(info,
3437             "# Stats\r\n"
3438             "total_connections_received:%lld\r\n"
3439             "total_commands_processed:%lld\r\n"
3440             "instantaneous_ops_per_sec:%lld\r\n"
3441             "total_net_input_bytes:%lld\r\n"
3442             "total_net_output_bytes:%lld\r\n"
3443             "instantaneous_input_kbps:%.2f\r\n"
3444             "instantaneous_output_kbps:%.2f\r\n"
3445             "rejected_connections:%lld\r\n"
3446             "sync_full:%lld\r\n"
3447             "sync_partial_ok:%lld\r\n"
3448             "sync_partial_err:%lld\r\n"
3449             "expired_keys:%lld\r\n"
3450             "expired_stale_perc:%.2f\r\n"
3451             "expired_time_cap_reached_count:%lld\r\n"
3452             "evicted_keys:%lld\r\n"
3453             "keyspace_hits:%lld\r\n"
3454             "keyspace_misses:%lld\r\n"
3455             "pubsub_channels:%ld\r\n"
3456             "pubsub_patterns:%lu\r\n"
3457             "latest_fork_usec:%lld\r\n"
3458             "migrate_cached_sockets:%ld\r\n"
3459             "slave_expires_tracked_keys:%zu\r\n"
3460             "active_defrag_hits:%lld\r\n"
3461             "active_defrag_misses:%lld\r\n"
3462             "active_defrag_key_hits:%lld\r\n"
3463             "active_defrag_key_misses:%lld\r\n",
3464             server.stat_numconnections,
3465             server.stat_numcommands,
3466             getInstantaneousMetric(STATS_METRIC_COMMAND),
3467             server.stat_net_input_bytes,
3468             server.stat_net_output_bytes,
3469             (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3470             (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3471             server.stat_rejected_conn,
3472             server.stat_sync_full,
3473             server.stat_sync_partial_ok,
3474             server.stat_sync_partial_err,
3475             server.stat_expiredkeys,
3476             server.stat_expired_stale_perc*100,
3477             server.stat_expired_time_cap_reached_count,
3478             server.stat_evictedkeys,
3479             server.stat_keyspace_hits,
3480             server.stat_keyspace_misses,
3481             dictSize(server.pubsub_channels),
3482             listLength(server.pubsub_patterns),
3483             server.stat_fork_time,
3484             dictSize(server.migrate_cached_sockets),
3485             getSlaveKeyWithExpireCount(),
3486             server.stat_active_defrag_hits,
3487             server.stat_active_defrag_misses,
3488             server.stat_active_defrag_key_hits,
3489             server.stat_active_defrag_key_misses);
3490     }
3491 
3492     /* Replication */
3493     if (allsections || defsections || !strcasecmp(section,"replication")) {
3494         if (sections++) info = sdscat(info,"\r\n");
3495         info = sdscatprintf(info,
3496             "# Replication\r\n"
3497             "role:%s\r\n",
3498             server.masterhost == NULL ? "master" : "slave");
3499         if (server.masterhost) {
3500             long long slave_repl_offset = 1;
3501 
3502             if (server.master)
3503                 slave_repl_offset = server.master->reploff;
3504             else if (server.cached_master)
3505                 slave_repl_offset = server.cached_master->reploff;
3506 
3507             info = sdscatprintf(info,
3508                 "master_host:%s\r\n"
3509                 "master_port:%d\r\n"
3510                 "master_link_status:%s\r\n"
3511                 "master_last_io_seconds_ago:%d\r\n"
3512                 "master_sync_in_progress:%d\r\n"
3513                 "slave_repl_offset:%lld\r\n"
3514                 ,server.masterhost,
3515                 server.masterport,
3516                 (server.repl_state == REPL_STATE_CONNECTED) ?
3517                     "up" : "down",
3518                 server.master ?
3519                 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3520                 server.repl_state == REPL_STATE_TRANSFER,
3521                 slave_repl_offset
3522             );
3523 
3524             if (server.repl_state == REPL_STATE_TRANSFER) {
3525                 info = sdscatprintf(info,
3526                     "master_sync_left_bytes:%lld\r\n"
3527                     "master_sync_last_io_seconds_ago:%d\r\n"
3528                     , (long long)
3529                         (server.repl_transfer_size - server.repl_transfer_read),
3530                     (int)(server.unixtime-server.repl_transfer_lastio)
3531                 );
3532             }
3533 
3534             if (server.repl_state != REPL_STATE_CONNECTED) {
3535                 info = sdscatprintf(info,
3536                     "master_link_down_since_seconds:%jd\r\n",
3537                     (intmax_t)server.unixtime-server.repl_down_since);
3538             }
3539             info = sdscatprintf(info,
3540                 "slave_priority:%d\r\n"
3541                 "slave_read_only:%d\r\n",
3542                 server.slave_priority,
3543                 server.repl_slave_ro);
3544         }
3545 
3546         info = sdscatprintf(info,
3547             "connected_slaves:%lu\r\n",
3548             listLength(server.slaves));
3549 
3550         /* If min-slaves-to-write is active, write the number of slaves
3551          * currently considered 'good'. */
3552         if (server.repl_min_slaves_to_write &&
3553             server.repl_min_slaves_max_lag) {
3554             info = sdscatprintf(info,
3555                 "min_slaves_good_slaves:%d\r\n",
3556                 server.repl_good_slaves_count);
3557         }
3558 
3559         if (listLength(server.slaves)) {
3560             int slaveid = 0;
3561             listNode *ln;
3562             listIter li;
3563 
3564             listRewind(server.slaves,&li);
3565             while((ln = listNext(&li))) {
3566                 client *slave = listNodeValue(ln);
3567                 char *state = NULL;
3568                 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
3569                 int port;
3570                 long lag = 0;
3571 
3572                 if (slaveip[0] == '\0') {
3573                     if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1)
3574                         continue;
3575                     slaveip = ip;
3576                 }
3577                 switch(slave->replstate) {
3578                 case SLAVE_STATE_WAIT_BGSAVE_START:
3579                 case SLAVE_STATE_WAIT_BGSAVE_END:
3580                     state = "wait_bgsave";
3581                     break;
3582                 case SLAVE_STATE_SEND_BULK:
3583                     state = "send_bulk";
3584                     break;
3585                 case SLAVE_STATE_ONLINE:
3586                     state = "online";
3587                     break;
3588                 }
3589                 if (state == NULL) continue;
3590                 if (slave->replstate == SLAVE_STATE_ONLINE)
3591                     lag = time(NULL) - slave->repl_ack_time;
3592 
3593                 info = sdscatprintf(info,
3594                     "slave%d:ip=%s,port=%d,state=%s,"
3595                     "offset=%lld,lag=%ld\r\n",
3596                     slaveid,slaveip,slave->slave_listening_port,state,
3597                     slave->repl_ack_off, lag);
3598                 slaveid++;
3599             }
3600         }
3601         info = sdscatprintf(info,
3602             "master_replid:%s\r\n"
3603             "master_replid2:%s\r\n"
3604             "master_repl_offset:%lld\r\n"
3605             "second_repl_offset:%lld\r\n"
3606             "repl_backlog_active:%d\r\n"
3607             "repl_backlog_size:%lld\r\n"
3608             "repl_backlog_first_byte_offset:%lld\r\n"
3609             "repl_backlog_histlen:%lld\r\n",
3610             server.replid,
3611             server.replid2,
3612             server.master_repl_offset,
3613             server.second_replid_offset,
3614             server.repl_backlog != NULL,
3615             server.repl_backlog_size,
3616             server.repl_backlog_off,
3617             server.repl_backlog_histlen);
3618     }
3619 
3620     /* CPU */
3621     if (allsections || defsections || !strcasecmp(section,"cpu")) {
3622         if (sections++) info = sdscat(info,"\r\n");
3623         info = sdscatprintf(info,
3624         "# CPU\r\n"
3625         "used_cpu_sys:%ld.%06ld\r\n"
3626         "used_cpu_user:%ld.%06ld\r\n"
3627         "used_cpu_sys_children:%ld.%06ld\r\n"
3628         "used_cpu_user_children:%ld.%06ld\r\n",
3629         (long)self_ru.ru_stime.tv_sec, (long)self_ru.ru_stime.tv_usec,
3630         (long)self_ru.ru_utime.tv_sec, (long)self_ru.ru_utime.tv_usec,
3631         (long)c_ru.ru_stime.tv_sec, (long)c_ru.ru_stime.tv_usec,
3632         (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec);
3633     }
3634 
3635     /* Command statistics */
3636     if (allsections || !strcasecmp(section,"commandstats")) {
3637         if (sections++) info = sdscat(info,"\r\n");
3638         info = sdscatprintf(info, "# Commandstats\r\n");
3639 
3640         struct redisCommand *c;
3641         dictEntry *de;
3642         dictIterator *di;
3643         di = dictGetSafeIterator(server.commands);
3644         while((de = dictNext(di)) != NULL) {
3645             c = (struct redisCommand *) dictGetVal(de);
3646             if (!c->calls) continue;
3647             info = sdscatprintf(info,
3648                 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3649                 c->name, c->calls, c->microseconds,
3650                 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3651         }
3652         dictReleaseIterator(di);
3653     }
3654 
3655     /* Cluster */
3656     if (allsections || defsections || !strcasecmp(section,"cluster")) {
3657         if (sections++) info = sdscat(info,"\r\n");
3658         info = sdscatprintf(info,
3659         "# Cluster\r\n"
3660         "cluster_enabled:%d\r\n",
3661         server.cluster_enabled);
3662     }
3663 
3664     /* Key space */
3665     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3666         if (sections++) info = sdscat(info,"\r\n");
3667         info = sdscatprintf(info, "# Keyspace\r\n");
3668         for (j = 0; j < server.dbnum; j++) {
3669             long long keys, vkeys;
3670 
3671             keys = dictSize(server.db[j].dict);
3672             vkeys = dictSize(server.db[j].expires);
3673             if (keys || vkeys) {
3674                 info = sdscatprintf(info,
3675                     "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3676                     j, keys, vkeys, server.db[j].avg_ttl);
3677             }
3678         }
3679     }
3680     return info;
3681 }
3682 
infoCommand(client * c)3683 void infoCommand(client *c) {
3684     char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3685 
3686     if (c->argc > 2) {
3687         addReply(c,shared.syntaxerr);
3688         return;
3689     }
3690     addReplyBulkSds(c, genRedisInfoString(section));
3691 }
3692 
monitorCommand(client * c)3693 void monitorCommand(client *c) {
3694     /* ignore MONITOR if already slave or in monitor mode */
3695     if (c->flags & CLIENT_SLAVE) return;
3696 
3697     c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3698     listAddNodeTail(server.monitors,c);
3699     addReply(c,shared.ok);
3700 }
3701 
3702 /* =================================== Main! ================================ */
3703 
3704 #ifdef __linux__
linuxOvercommitMemoryValue(void)3705 int linuxOvercommitMemoryValue(void) {
3706     FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3707     char buf[64];
3708 
3709     if (!fp) return -1;
3710     if (fgets(buf,64,fp) == NULL) {
3711         fclose(fp);
3712         return -1;
3713     }
3714     fclose(fp);
3715 
3716     return atoi(buf);
3717 }
3718 
linuxMemoryWarnings(void)3719 void linuxMemoryWarnings(void) {
3720     if (linuxOvercommitMemoryValue() == 0) {
3721         serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3722     }
3723     if (THPIsEnabled()) {
3724         serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3725     }
3726 }
3727 #endif /* __linux__ */
3728 
createPidFile(void)3729 void createPidFile(void) {
3730     /* If pidfile requested, but no pidfile defined, use
3731      * default pidfile path */
3732     if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3733 
3734     /* Try to write the pid file in a best-effort way. */
3735     FILE *fp = fopen(server.pidfile,"w");
3736     if (fp) {
3737         fprintf(fp,"%d\n",(int)getpid());
3738         fclose(fp);
3739     }
3740 }
3741 
daemonize(void)3742 void daemonize(void) {
3743     int fd;
3744 
3745     if (fork() != 0) exit(0); /* parent exits */
3746     setsid(); /* create a new session */
3747 
3748     /* Every output goes to /dev/null. If Redis is daemonized but
3749      * the 'logfile' is set to 'stdout' in the configuration file
3750      * it will not log at all. */
3751     if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3752         dup2(fd, STDIN_FILENO);
3753         dup2(fd, STDOUT_FILENO);
3754         dup2(fd, STDERR_FILENO);
3755         if (fd > STDERR_FILENO) close(fd);
3756     }
3757 }
3758 
version(void)3759 void version(void) {
3760     printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3761         REDIS_VERSION,
3762         redisGitSHA1(),
3763         atoi(redisGitDirty()) > 0,
3764         ZMALLOC_LIB,
3765         sizeof(long) == 4 ? 32 : 64,
3766         (unsigned long long) redisBuildId());
3767     exit(0);
3768 }
3769 
usage(void)3770 void usage(void) {
3771     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3772     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
3773     fprintf(stderr,"       ./redis-server -v or --version\n");
3774     fprintf(stderr,"       ./redis-server -h or --help\n");
3775     fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
3776     fprintf(stderr,"Examples:\n");
3777     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
3778     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
3779     fprintf(stderr,"       ./redis-server --port 7777\n");
3780     fprintf(stderr,"       ./redis-server --port 7777 --replicaof 127.0.0.1 8888\n");
3781     fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3782     fprintf(stderr,"Sentinel mode:\n");
3783     fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
3784     exit(1);
3785 }
3786 
redisAsciiArt(void)3787 void redisAsciiArt(void) {
3788 #include "asciilogo.h"
3789     char *buf = zmalloc(1024*16);
3790     char *mode;
3791 
3792     if (server.cluster_enabled) mode = "cluster";
3793     else if (server.sentinel_mode) mode = "sentinel";
3794     else mode = "standalone";
3795 
3796     /* Show the ASCII logo if: log file is stdout AND stdout is a
3797      * tty AND syslog logging is disabled. Also show logo if the user
3798      * forced us to do so via redis.conf. */
3799     int show_logo = ((!server.syslog_enabled &&
3800                       server.logfile[0] == '\0' &&
3801                       isatty(fileno(stdout))) ||
3802                      server.always_show_logo);
3803 
3804     if (!show_logo) {
3805         serverLog(LL_NOTICE,
3806             "Running mode=%s, port=%d.",
3807             mode, server.port
3808         );
3809     } else {
3810         snprintf(buf,1024*16,ascii_logo,
3811             REDIS_VERSION,
3812             redisGitSHA1(),
3813             strtol(redisGitDirty(),NULL,10) > 0,
3814             (sizeof(long) == 8) ? "64" : "32",
3815             mode, server.port,
3816             (long) getpid()
3817         );
3818         serverLogRaw(LL_NOTICE|LL_RAW,buf);
3819     }
3820     zfree(buf);
3821 }
3822 
sigShutdownHandler(int sig)3823 static void sigShutdownHandler(int sig) {
3824     char *msg;
3825 
3826     switch (sig) {
3827     case SIGINT:
3828         msg = "Received SIGINT scheduling shutdown...";
3829         break;
3830     case SIGTERM:
3831         msg = "Received SIGTERM scheduling shutdown...";
3832         break;
3833     default:
3834         msg = "Received shutdown signal, scheduling shutdown...";
3835     };
3836 
3837     /* SIGINT is often delivered via Ctrl+C in an interactive session.
3838      * If we receive the signal the second time, we interpret this as
3839      * the user really wanting to quit ASAP without waiting to persist
3840      * on disk. */
3841     if (server.shutdown_asap && sig == SIGINT) {
3842         serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3843         rdbRemoveTempFile(getpid());
3844         exit(1); /* Exit with an error since this was not a clean shutdown. */
3845     } else if (server.loading) {
3846         serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");
3847         exit(0);
3848     }
3849 
3850     serverLogFromHandler(LL_WARNING, msg);
3851     server.shutdown_asap = 1;
3852 }
3853 
setupSignalHandlers(void)3854 void setupSignalHandlers(void) {
3855     struct sigaction act;
3856 
3857     /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3858      * Otherwise, sa_handler is used. */
3859     sigemptyset(&act.sa_mask);
3860     act.sa_flags = 0;
3861     act.sa_handler = sigShutdownHandler;
3862     sigaction(SIGTERM, &act, NULL);
3863     sigaction(SIGINT, &act, NULL);
3864 
3865 #ifdef HAVE_BACKTRACE
3866     sigemptyset(&act.sa_mask);
3867     act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3868     act.sa_sigaction = sigsegvHandler;
3869     sigaction(SIGSEGV, &act, NULL);
3870     sigaction(SIGBUS, &act, NULL);
3871     sigaction(SIGFPE, &act, NULL);
3872     sigaction(SIGILL, &act, NULL);
3873 #endif
3874     return;
3875 }
3876 
3877 void memtest(size_t megabytes, int passes);
3878 
3879 /* Returns 1 if there is --sentinel among the arguments or if
3880  * argv[0] contains "redis-sentinel". */
checkForSentinelMode(int argc,char ** argv)3881 int checkForSentinelMode(int argc, char **argv) {
3882     int j;
3883 
3884     if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3885     for (j = 1; j < argc; j++)
3886         if (!strcmp(argv[j],"--sentinel")) return 1;
3887     return 0;
3888 }
3889 
3890 /* Function called at startup to load RDB or AOF file in memory. */
loadDataFromDisk(void)3891 void loadDataFromDisk(void) {
3892     long long start = ustime();
3893     if (server.aof_state == AOF_ON) {
3894         if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3895             serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3896     } else {
3897         rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
3898         if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
3899             serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3900                 (float)(ustime()-start)/1000000);
3901 
3902             /* Restore the replication ID / offset from the RDB file. */
3903             if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&&
3904                 rsi.repl_id_is_set &&
3905                 rsi.repl_offset != -1 &&
3906                 /* Note that older implementations may save a repl_stream_db
3907                  * of -1 inside the RDB file in a wrong way, see more information
3908                  * in function rdbPopulateSaveInfo. */
3909                 rsi.repl_stream_db != -1)
3910             {
3911                 memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
3912                 server.master_repl_offset = rsi.repl_offset;
3913                 /* If we are a slave, create a cached master from this
3914                  * information, in order to allow partial resynchronizations
3915                  * with masters. */
3916                 replicationCacheMasterUsingMyself();
3917                 selectDb(server.cached_master,rsi.repl_stream_db);
3918             }
3919         } else if (errno != ENOENT) {
3920             serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3921             exit(1);
3922         }
3923     }
3924 }
3925 
redisOutOfMemoryHandler(size_t allocation_size)3926 void redisOutOfMemoryHandler(size_t allocation_size) {
3927     serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3928         allocation_size);
3929     serverPanic("Redis aborting for OUT OF MEMORY");
3930 }
3931 
redisSetProcTitle(char * title)3932 void redisSetProcTitle(char *title) {
3933 #ifdef USE_SETPROCTITLE
3934     char *server_mode = "";
3935     if (server.cluster_enabled) server_mode = " [cluster]";
3936     else if (server.sentinel_mode) server_mode = " [sentinel]";
3937 
3938     setproctitle("%s %s:%d%s",
3939         title,
3940         server.bindaddr_count ? server.bindaddr[0] : "*",
3941         server.port,
3942         server_mode);
3943 #else
3944     UNUSED(title);
3945 #endif
3946 }
3947 
3948 /*
3949  * Check whether systemd or upstart have been used to start redis.
3950  */
3951 
redisSupervisedUpstart(void)3952 int redisSupervisedUpstart(void) {
3953     const char *upstart_job = getenv("UPSTART_JOB");
3954 
3955     if (!upstart_job) {
3956         serverLog(LL_WARNING,
3957                 "upstart supervision requested, but UPSTART_JOB not found");
3958         return 0;
3959     }
3960 
3961     serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness");
3962     raise(SIGSTOP);
3963     unsetenv("UPSTART_JOB");
3964     return 1;
3965 }
3966 
redisSupervisedSystemd(void)3967 int redisSupervisedSystemd(void) {
3968     const char *notify_socket = getenv("NOTIFY_SOCKET");
3969     int fd = 1;
3970     struct sockaddr_un su;
3971     struct iovec iov;
3972     struct msghdr hdr;
3973     int sendto_flags = 0;
3974 
3975     if (!notify_socket) {
3976         serverLog(LL_WARNING,
3977                 "systemd supervision requested, but NOTIFY_SOCKET not found");
3978         return 0;
3979     }
3980 
3981     if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3982         return 0;
3983     }
3984 
3985     serverLog(LL_NOTICE, "supervised by systemd, will signal readiness");
3986     if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3987         serverLog(LL_WARNING,
3988                 "Can't connect to systemd socket %s", notify_socket);
3989         return 0;
3990     }
3991 
3992     memset(&su, 0, sizeof(su));
3993     su.sun_family = AF_UNIX;
3994     strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3995     su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3996 
3997     if (notify_socket[0] == '@')
3998         su.sun_path[0] = '\0';
3999 
4000     memset(&iov, 0, sizeof(iov));
4001     iov.iov_base = "READY=1";
4002     iov.iov_len = strlen("READY=1");
4003 
4004     memset(&hdr, 0, sizeof(hdr));
4005     hdr.msg_name = &su;
4006     hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
4007         strlen(notify_socket);
4008     hdr.msg_iov = &iov;
4009     hdr.msg_iovlen = 1;
4010 
4011     unsetenv("NOTIFY_SOCKET");
4012 #ifdef HAVE_MSG_NOSIGNAL
4013     sendto_flags |= MSG_NOSIGNAL;
4014 #endif
4015     if (sendmsg(fd, &hdr, sendto_flags) < 0) {
4016         serverLog(LL_WARNING, "Can't send notification to systemd");
4017         close(fd);
4018         return 0;
4019     }
4020     close(fd);
4021     return 1;
4022 }
4023 
redisIsSupervised(int mode)4024 int redisIsSupervised(int mode) {
4025     if (mode == SUPERVISED_AUTODETECT) {
4026         const char *upstart_job = getenv("UPSTART_JOB");
4027         const char *notify_socket = getenv("NOTIFY_SOCKET");
4028 
4029         if (upstart_job) {
4030             redisSupervisedUpstart();
4031         } else if (notify_socket) {
4032             redisSupervisedSystemd();
4033         }
4034     } else if (mode == SUPERVISED_UPSTART) {
4035         return redisSupervisedUpstart();
4036     } else if (mode == SUPERVISED_SYSTEMD) {
4037         return redisSupervisedSystemd();
4038     }
4039 
4040     return 0;
4041 }
4042 
loop(void * arg)4043 static int loop(void *arg) {
4044     aeMain((aeEventLoop *)arg);
4045     return 0;
4046 }
4047 
main(int argc,char ** argv)4048 int main(int argc, char **argv) {
4049     struct timeval tv;
4050     int j;
4051 
4052 #ifdef HAVE_FF_KQUEUE
4053     int rc = ff_init(argc, argv);
4054     assert(0 == rc);
4055     ff_mod_init();
4056     int new_argc = argc - 4;
4057     if (new_argc <= 0) {
4058         new_argc = 1;
4059     }
4060 
4061     char **new_argv = zmalloc(sizeof(char *) * new_argc);
4062     new_argv[0] = argv[0];
4063     int i;
4064     for (i = 1; i < new_argc; i++) {
4065         new_argv[i] = argv[i + 4];
4066     }
4067     argv = new_argv;
4068     argc = new_argc;
4069 #endif
4070 
4071 
4072 #ifdef REDIS_TEST
4073     if (argc == 3 && !strcasecmp(argv[1], "test")) {
4074         if (!strcasecmp(argv[2], "ziplist")) {
4075             return ziplistTest(argc, argv);
4076         } else if (!strcasecmp(argv[2], "quicklist")) {
4077             quicklistTest(argc, argv);
4078         } else if (!strcasecmp(argv[2], "intset")) {
4079             return intsetTest(argc, argv);
4080         } else if (!strcasecmp(argv[2], "zipmap")) {
4081             return zipmapTest(argc, argv);
4082         } else if (!strcasecmp(argv[2], "sha1test")) {
4083             return sha1Test(argc, argv);
4084         } else if (!strcasecmp(argv[2], "util")) {
4085             return utilTest(argc, argv);
4086         } else if (!strcasecmp(argv[2], "endianconv")) {
4087             return endianconvTest(argc, argv);
4088         } else if (!strcasecmp(argv[2], "crc64")) {
4089             return crc64Test(argc, argv);
4090         } else if (!strcasecmp(argv[2], "zmalloc")) {
4091             return zmalloc_test(argc, argv);
4092         }
4093 
4094         return -1; /* test not found */
4095     }
4096 #endif
4097 
4098     /* We need to initialize our libraries, and the server configuration. */
4099 #ifdef INIT_SETPROCTITLE_REPLACEMENT
4100     spt_init(argc, argv);
4101 #endif
4102     setlocale(LC_COLLATE,"");
4103     tzset(); /* Populates 'timezone' global. */
4104     zmalloc_set_oom_handler(redisOutOfMemoryHandler);
4105     srand(time(NULL)^getpid());
4106     gettimeofday(&tv,NULL);
4107 
4108     char hashseed[16];
4109     getRandomHexChars(hashseed,sizeof(hashseed));
4110     dictSetHashFunctionSeed((uint8_t*)hashseed);
4111     server.sentinel_mode = checkForSentinelMode(argc,argv);
4112     initServerConfig();
4113     moduleInitModulesSystem();
4114 
4115     /* Store the executable path and arguments in a safe place in order
4116      * to be able to restart the server later. */
4117     server.executable = getAbsolutePath(argv[0]);
4118     server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
4119     server.exec_argv[argc] = NULL;
4120     for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
4121 
4122     /* We need to init sentinel right now as parsing the configuration file
4123      * in sentinel mode will have the effect of populating the sentinel
4124      * data structures with master nodes to monitor. */
4125     if (server.sentinel_mode) {
4126         initSentinelConfig();
4127         initSentinel();
4128     }
4129 
4130     /* Check if we need to start in redis-check-rdb/aof mode. We just execute
4131      * the program main. However the program is part of the Redis executable
4132      * so that we can easily execute an RDB check on loading errors. */
4133     if (strstr(argv[0],"redis-check-rdb") != NULL)
4134         redis_check_rdb_main(argc,argv,NULL);
4135     else if (strstr(argv[0],"redis-check-aof") != NULL)
4136         redis_check_aof_main(argc,argv);
4137 
4138     if (argc >= 2) {
4139         j = 1; /* First option to parse in argv[] */
4140         sds options = sdsempty();
4141         char *configfile = NULL;
4142 
4143         /* Handle special options --help and --version */
4144         if (strcmp(argv[1], "-v") == 0 ||
4145             strcmp(argv[1], "--version") == 0) version();
4146         if (strcmp(argv[1], "--help") == 0 ||
4147             strcmp(argv[1], "-h") == 0) usage();
4148         if (strcmp(argv[1], "--test-memory") == 0) {
4149             if (argc == 3) {
4150                 memtest(atoi(argv[2]),50);
4151                 exit(0);
4152             } else {
4153                 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
4154                 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
4155                 exit(1);
4156             }
4157         }
4158 
4159         /* First argument is the config file name? */
4160         if (argv[j][0] != '-' || argv[j][1] != '-') {
4161             configfile = argv[j];
4162             server.configfile = getAbsolutePath(configfile);
4163             /* Replace the config file in server.exec_argv with
4164              * its absolute path. */
4165             zfree(server.exec_argv[j]);
4166             server.exec_argv[j] = zstrdup(server.configfile);
4167             j++;
4168         }
4169 
4170         /* All the other options are parsed and conceptually appended to the
4171          * configuration file. For instance --port 6380 will generate the
4172          * string "port 6380\n" to be parsed after the actual file name
4173          * is parsed, if any. */
4174         while(j != argc) {
4175             if (argv[j][0] == '-' && argv[j][1] == '-') {
4176                 /* Option name */
4177                 if (!strcmp(argv[j], "--check-rdb")) {
4178                     /* Argument has no options, need to skip for parsing. */
4179                     j++;
4180                     continue;
4181                 }
4182                 if (sdslen(options)) options = sdscat(options,"\n");
4183                 options = sdscat(options,argv[j]+2);
4184                 options = sdscat(options," ");
4185             } else {
4186                 /* Option argument */
4187                 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4188                 options = sdscat(options," ");
4189             }
4190             j++;
4191         }
4192         if (server.sentinel_mode && configfile && *configfile == '-') {
4193             serverLog(LL_WARNING,
4194                 "Sentinel config from STDIN not allowed.");
4195             serverLog(LL_WARNING,
4196                 "Sentinel needs config file on disk to save state.  Exiting...");
4197             exit(1);
4198         }
4199         resetServerSaveParams();
4200         loadServerConfig(configfile,options);
4201         sdsfree(options);
4202     }
4203 
4204     serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
4205     serverLog(LL_WARNING,
4206         "Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
4207             REDIS_VERSION,
4208             (sizeof(long) == 8) ? 64 : 32,
4209             redisGitSHA1(),
4210             strtol(redisGitDirty(),NULL,10) > 0,
4211             (int)getpid());
4212 
4213     if (argc == 1) {
4214         serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4215     } else {
4216         serverLog(LL_WARNING, "Configuration loaded");
4217     }
4218 
4219     server.supervised = redisIsSupervised(server.supervised_mode);
4220     int background = server.daemonize && !server.supervised;
4221     if (background) daemonize();
4222 
4223     initServer();
4224     if (background || server.pidfile) createPidFile();
4225     redisSetProcTitle(argv[0]);
4226     redisAsciiArt();
4227     checkTcpBacklogSettings();
4228 
4229     if (!server.sentinel_mode) {
4230         /* Things not needed when running in Sentinel mode. */
4231         serverLog(LL_WARNING,"Server initialized");
4232     #ifdef __linux__
4233         linuxMemoryWarnings();
4234     #endif
4235         moduleLoadFromQueue();
4236         loadDataFromDisk();
4237         if (server.cluster_enabled) {
4238             if (verifyClusterConfigWithData() == C_ERR) {
4239                 serverLog(LL_WARNING,
4240                     "You can't have keys in a DB different than DB 0 when in "
4241                     "Cluster mode. Exiting.");
4242                 exit(1);
4243             }
4244         }
4245         if (server.ipfd_count > 0)
4246             serverLog(LL_NOTICE,"Ready to accept connections");
4247         if (server.sofd > 0)
4248             serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4249     } else {
4250         sentinelIsRunning();
4251     }
4252 
4253     /* Warning the user about suspicious maxmemory setting. */
4254     if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4255         serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4256     }
4257 
4258     aeSetBeforeSleepProc(server.el,beforeSleep);
4259     aeSetAfterSleepProc(server.el,afterSleep);
4260     ff_run(loop, server.el);
4261     aeDeleteEventLoop(server.el);
4262 #ifdef HAVE_FF_KQUEUE
4263     zfree(new_argv);
4264 #endif
4265     return 0;
4266 }
4267 
4268 /* The End */
4269