1 /*
2 * Copyright (c) 2009-2016, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35 #include "atomicvar.h"
36
37 #ifdef HAVE_FF_KQUEUE
38 #include "ff_api.h"
39 #include "anet_ff.h"
40 #endif
41
42 #include <time.h>
43 #include <signal.h>
44 #include <sys/wait.h>
45 #include <errno.h>
46 #include <assert.h>
47 #include <ctype.h>
48 #include <stdarg.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <sys/uio.h>
55 #include <sys/un.h>
56 #include <limits.h>
57 #include <float.h>
58 #include <math.h>
59 #include <sys/resource.h>
60 #include <sys/utsname.h>
61 #include <locale.h>
62 #include <sys/socket.h>
63
64 /* Our shared "common" objects */
65
66 struct sharedObjectsStruct shared;
67
68 /* Global vars that are actually used as constants. The following double
69 * values are used for double on-disk serialization, and are initialized
70 * at runtime to avoid strange compiler optimizations. */
71
72 double R_Zero, R_PosInf, R_NegInf, R_Nan;
73
74 /*================================= Globals ================================= */
75
76 /* Global vars */
77 struct redisServer server; /* Server global state */
78 volatile unsigned long lru_clock; /* Server global current LRU time. */
79
80 /* Our command table.
81 *
82 * Every entry is composed of the following fields:
83 *
84 * name: a string representing the command name.
85 * function: pointer to the C function implementing the command.
86 * arity: number of arguments, it is possible to use -N to say >= N
87 * sflags: command flags as string. See below for a table of flags.
88 * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
89 * get_keys_proc: an optional function to get key arguments from a command.
90 * This is only used when the following three fields are not
91 * enough to specify what arguments are keys.
92 * first_key_index: first argument that is a key
93 * last_key_index: last argument that is a key
94 * key_step: step to get all the keys from first to last argument. For instance
95 * in MSET the step is two since arguments are key,val,key,val,...
96 * microseconds: microseconds of total execution time for this command.
97 * calls: total number of calls of this command.
98 *
99 * The flags, microseconds and calls fields are computed by Redis and should
100 * always be set to zero.
101 *
102 * Command flags are expressed using strings where every character represents
103 * a flag. Later the populateCommandTable() function will take care of
104 * populating the real 'flags' field using this characters.
105 *
106 * This is the meaning of the flags:
107 *
108 * w: write command (may modify the key space).
109 * r: read command (will never modify the key space).
110 * m: may increase memory usage once called. Don't allow if out of memory.
111 * a: admin command, like SAVE or SHUTDOWN.
112 * p: Pub/Sub related command.
113 * f: force replication of this command, regardless of server.dirty.
114 * s: command not allowed in scripts.
115 * R: random command. Command is not deterministic, that is, the same command
116 * with the same arguments, with the same key space, may have different
117 * results. For instance SPOP and RANDOMKEY are two random commands.
118 * S: Sort command output array if called from script, so that the output
119 * is deterministic.
120 * l: Allow command while loading the database.
121 * t: Allow command while a slave has stale data but is not allowed to
122 * server this data. Normally no command is accepted in this condition
123 * but just a few.
124 * M: Do not automatically propagate the command on MONITOR.
125 * k: Perform an implicit ASKING for this command, so the command will be
126 * accepted in cluster mode if the slot is marked as 'importing'.
127 * F: Fast command: O(1) or O(log(N)) command that should never delay
128 * its execution as long as the kernel scheduler is giving us time.
129 * Note that commands that may trigger a DEL as a side effect (like SET)
130 * are not fast commands.
131 */
132 struct redisCommand redisCommandTable[] = {
133 {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
134 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
135 {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
136 {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
137 {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
138 {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
139 {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
140 {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
141 {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
142 {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
143 {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
144 {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
145 {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
146 {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
147 {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
148 {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
149 {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
150 {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
151 {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
152 {"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
153 {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
154 {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
155 {"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
156 {"lpushx",lpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
157 {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
158 {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
159 {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
160 {"brpop",brpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
161 {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
162 {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
163 {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
164 {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
165 {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
166 {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
167 {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
168 {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
169 {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
170 {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
171 {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
172 {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
173 {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
174 {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
175 {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0},
176 {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
177 {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
178 {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
179 {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
180 {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
181 {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
182 {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
183 {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
184 {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
185 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
186 {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
187 {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
188 {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
189 {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
190 {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
191 {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
192 {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
193 {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
194 {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
195 {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
196 {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
197 {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
198 {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
199 {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
200 {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
201 {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
202 {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
203 {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
204 {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
205 {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
206 {"zpopmin",zpopminCommand,-2,"wF",0,NULL,1,1,1,0,0},
207 {"zpopmax",zpopmaxCommand,-2,"wF",0,NULL,1,1,1,0,0},
208 {"bzpopmin",bzpopminCommand,-3,"wsF",0,NULL,1,-2,1,0,0},
209 {"bzpopmax",bzpopmaxCommand,-3,"wsF",0,NULL,1,-2,1,0,0},
210 {"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
211 {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
212 {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
213 {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
214 {"hmget",hmgetCommand,-3,"rF",0,NULL,1,1,1,0,0},
215 {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
216 {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
217 {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
218 {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
219 {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
220 {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
221 {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
222 {"hgetall",hgetallCommand,2,"rR",0,NULL,1,1,1,0,0},
223 {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
224 {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
225 {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
226 {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
227 {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
228 {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
229 {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
230 {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
231 {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
232 {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
233 {"swapdb",swapdbCommand,3,"wF",0,NULL,0,0,0,0,0},
234 {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
235 {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
236 {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
237 {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
238 {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
239 {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
240 {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
241 {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
242 {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
243 {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
244 {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0},
245 {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0},
246 {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0},
247 {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0},
248 {"bgsave",bgsaveCommand,-1,"as",0,NULL,0,0,0,0,0},
249 {"bgrewriteaof",bgrewriteaofCommand,1,"as",0,NULL,0,0,0,0,0},
250 {"shutdown",shutdownCommand,-1,"aslt",0,NULL,0,0,0,0,0},
251 {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0},
252 {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
253 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
254 {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
255 {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
256 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
257 {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
258 {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0},
259 {"flushdb",flushdbCommand,-1,"w",0,NULL,0,0,0,0,0},
260 {"flushall",flushallCommand,-1,"w",0,NULL,0,0,0,0,0},
261 {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
262 {"info",infoCommand,-1,"ltR",0,NULL,0,0,0,0,0},
263 {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0},
264 {"ttl",ttlCommand,2,"rFR",0,NULL,1,1,1,0,0},
265 {"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0},
266 {"pttl",pttlCommand,2,"rFR",0,NULL,1,1,1,0,0},
267 {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
268 {"slaveof",replicaofCommand,3,"ast",0,NULL,0,0,0,0,0},
269 {"replicaof",replicaofCommand,3,"ast",0,NULL,0,0,0,0,0},
270 {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
271 {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
272 {"config",configCommand,-2,"last",0,NULL,0,0,0,0,0},
273 {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
274 {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
275 {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
276 {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
277 {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
278 {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
279 {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
280 {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
281 {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0},
282 {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
283 {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
284 {"migrate",migrateCommand,-6,"wR",0,migrateGetKeys,0,0,0,0,0},
285 {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
286 {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
287 {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
288 {"dump",dumpCommand,2,"rR",0,NULL,1,1,1,0,0},
289 {"object",objectCommand,-2,"rR",0,NULL,2,2,1,0,0},
290 {"memory",memoryCommand,-2,"rR",0,NULL,0,0,0,0,0},
291 {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
292 {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
293 {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
294 {"slowlog",slowlogCommand,-2,"aR",0,NULL,0,0,0,0,0},
295 {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0},
296 {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0},
297 {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
298 {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
299 {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
300 {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0},
301 {"command",commandCommand,0,"ltR",0,NULL,0,0,0,0,0},
302 {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
303 {"georadius",georadiusCommand,-6,"w",0,georadiusGetKeys,1,1,1,0,0},
304 {"georadius_ro",georadiusroCommand,-6,"r",0,georadiusGetKeys,1,1,1,0,0},
305 {"georadiusbymember",georadiusbymemberCommand,-5,"w",0,georadiusGetKeys,1,1,1,0,0},
306 {"georadiusbymember_ro",georadiusbymemberroCommand,-5,"r",0,georadiusGetKeys,1,1,1,0,0},
307 {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
308 {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
309 {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
310 {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0},
311 {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
312 {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
313 {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
314 {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
315 {"xadd",xaddCommand,-5,"wmFR",0,NULL,1,1,1,0,0},
316 {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
317 {"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
318 {"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
319 {"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
320 {"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
321 {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
322 {"xsetid",xsetidCommand,3,"wmF",0,NULL,1,1,1,0,0},
323 {"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
324 {"xpending",xpendingCommand,-3,"rR",0,NULL,1,1,1,0,0},
325 {"xclaim",xclaimCommand,-6,"wRF",0,NULL,1,1,1,0,0},
326 {"xinfo",xinfoCommand,-2,"rR",0,NULL,2,2,1,0,0},
327 {"xdel",xdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
328 {"xtrim",xtrimCommand,-2,"wFR",0,NULL,1,1,1,0,0},
329 {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
330 {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
331 {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0},
332 {"lolwut",lolwutCommand,-1,"r",0,NULL,0,0,0,0,0}
333 };
334
335 /*============================ Utility functions ============================ */
336
337 /* We use a private localtime implementation which is fork-safe. The logging
338 * function of Redis may be called from other threads. */
339 void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst);
340
341 /* Low level logging. To use only for very big messages, otherwise
342 * serverLog() is to prefer. */
serverLogRaw(int level,const char * msg)343 void serverLogRaw(int level, const char *msg) {
344 const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
345 const char *c = ".-*#";
346 FILE *fp;
347 char buf[64];
348 int rawmode = (level & LL_RAW);
349 int log_to_stdout = server.logfile[0] == '\0';
350
351 level &= 0xff; /* clear flags */
352 if (level < server.verbosity) return;
353
354 fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
355 if (!fp) return;
356
357 if (rawmode) {
358 fprintf(fp,"%s",msg);
359 } else {
360 int off;
361 struct timeval tv;
362 int role_char;
363 pid_t pid = getpid();
364
365 gettimeofday(&tv,NULL);
366 struct tm tm;
367 nolocks_localtime(&tm,tv.tv_sec,server.timezone,server.daylight_active);
368 off = strftime(buf,sizeof(buf),"%d %b %Y %H:%M:%S.",&tm);
369 snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
370 if (server.sentinel_mode) {
371 role_char = 'X'; /* Sentinel. */
372 } else if (pid != server.pid) {
373 role_char = 'C'; /* RDB / AOF writing child. */
374 } else {
375 role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
376 }
377 fprintf(fp,"%d:%c %s %c %s\n",
378 (int)getpid(),role_char, buf,c[level],msg);
379 }
380 fflush(fp);
381
382 if (!log_to_stdout) fclose(fp);
383 if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
384 }
385
386 /* Like serverLogRaw() but with printf-alike support. This is the function that
387 * is used across the code. The raw version is only used in order to dump
388 * the INFO output on crash. */
serverLog(int level,const char * fmt,...)389 void serverLog(int level, const char *fmt, ...) {
390 va_list ap;
391 char msg[LOG_MAX_LEN];
392
393 if ((level&0xff) < server.verbosity) return;
394
395 va_start(ap, fmt);
396 vsnprintf(msg, sizeof(msg), fmt, ap);
397 va_end(ap);
398
399 serverLogRaw(level,msg);
400 }
401
402 /* Log a fixed message without printf-alike capabilities, in a way that is
403 * safe to call from a signal handler.
404 *
405 * We actually use this only for signals that are not fatal from the point
406 * of view of Redis. Signals that are going to kill the server anyway and
407 * where we need printf-alike features are served by serverLog(). */
serverLogFromHandler(int level,const char * msg)408 void serverLogFromHandler(int level, const char *msg) {
409 int fd;
410 int log_to_stdout = server.logfile[0] == '\0';
411 char buf[64];
412
413 if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
414 return;
415 fd = log_to_stdout ? STDOUT_FILENO :
416 open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
417 if (fd == -1) return;
418 ll2string(buf,sizeof(buf),getpid());
419 if (write(fd,buf,strlen(buf)) == -1) goto err;
420 if (write(fd,":signal-handler (",17) == -1) goto err;
421 ll2string(buf,sizeof(buf),time(NULL));
422 if (write(fd,buf,strlen(buf)) == -1) goto err;
423 if (write(fd,") ",2) == -1) goto err;
424 if (write(fd,msg,strlen(msg)) == -1) goto err;
425 if (write(fd,"\n",1) == -1) goto err;
426 err:
427 if (!log_to_stdout) close(fd);
428 }
429
430 /* Return the UNIX time in microseconds */
ustime(void)431 long long ustime(void) {
432 struct timeval tv;
433 long long ust;
434
435 gettimeofday(&tv, NULL);
436 ust = ((long long)tv.tv_sec)*1000000;
437 ust += tv.tv_usec;
438 return ust;
439 }
440
441 /* Return the UNIX time in milliseconds */
mstime(void)442 mstime_t mstime(void) {
443 return ustime()/1000;
444 }
445
446 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
447 * exit(), because the latter may interact with the same file objects used by
448 * the parent process. However if we are testing the coverage normal exit() is
449 * used in order to obtain the right coverage information. */
exitFromChild(int retcode)450 void exitFromChild(int retcode) {
451 #ifdef COVERAGE_TEST
452 exit(retcode);
453 #else
454 _exit(retcode);
455 #endif
456 }
457
458 /*====================== Hash table type implementation ==================== */
459
460 /* This is a hash table type that uses the SDS dynamic strings library as
461 * keys and redis objects as values (objects can hold SDS strings,
462 * lists, sets). */
463
dictVanillaFree(void * privdata,void * val)464 void dictVanillaFree(void *privdata, void *val)
465 {
466 DICT_NOTUSED(privdata);
467 zfree(val);
468 }
469
dictListDestructor(void * privdata,void * val)470 void dictListDestructor(void *privdata, void *val)
471 {
472 DICT_NOTUSED(privdata);
473 listRelease((list*)val);
474 }
475
dictSdsKeyCompare(void * privdata,const void * key1,const void * key2)476 int dictSdsKeyCompare(void *privdata, const void *key1,
477 const void *key2)
478 {
479 int l1,l2;
480 DICT_NOTUSED(privdata);
481
482 l1 = sdslen((sds)key1);
483 l2 = sdslen((sds)key2);
484 if (l1 != l2) return 0;
485 return memcmp(key1, key2, l1) == 0;
486 }
487
488 /* A case insensitive version used for the command lookup table and other
489 * places where case insensitive non binary-safe comparison is needed. */
dictSdsKeyCaseCompare(void * privdata,const void * key1,const void * key2)490 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
491 const void *key2)
492 {
493 DICT_NOTUSED(privdata);
494
495 return strcasecmp(key1, key2) == 0;
496 }
497
dictObjectDestructor(void * privdata,void * val)498 void dictObjectDestructor(void *privdata, void *val)
499 {
500 DICT_NOTUSED(privdata);
501
502 if (val == NULL) return; /* Lazy freeing will set value to NULL. */
503 decrRefCount(val);
504 }
505
dictSdsDestructor(void * privdata,void * val)506 void dictSdsDestructor(void *privdata, void *val)
507 {
508 DICT_NOTUSED(privdata);
509
510 sdsfree(val);
511 }
512
dictObjKeyCompare(void * privdata,const void * key1,const void * key2)513 int dictObjKeyCompare(void *privdata, const void *key1,
514 const void *key2)
515 {
516 const robj *o1 = key1, *o2 = key2;
517 return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
518 }
519
dictObjHash(const void * key)520 uint64_t dictObjHash(const void *key) {
521 const robj *o = key;
522 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
523 }
524
dictSdsHash(const void * key)525 uint64_t dictSdsHash(const void *key) {
526 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
527 }
528
dictSdsCaseHash(const void * key)529 uint64_t dictSdsCaseHash(const void *key) {
530 return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
531 }
532
dictEncObjKeyCompare(void * privdata,const void * key1,const void * key2)533 int dictEncObjKeyCompare(void *privdata, const void *key1,
534 const void *key2)
535 {
536 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
537 int cmp;
538
539 if (o1->encoding == OBJ_ENCODING_INT &&
540 o2->encoding == OBJ_ENCODING_INT)
541 return o1->ptr == o2->ptr;
542
543 o1 = getDecodedObject(o1);
544 o2 = getDecodedObject(o2);
545 cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
546 decrRefCount(o1);
547 decrRefCount(o2);
548 return cmp;
549 }
550
dictEncObjHash(const void * key)551 uint64_t dictEncObjHash(const void *key) {
552 robj *o = (robj*) key;
553
554 if (sdsEncodedObject(o)) {
555 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
556 } else {
557 if (o->encoding == OBJ_ENCODING_INT) {
558 char buf[32];
559 int len;
560
561 len = ll2string(buf,32,(long)o->ptr);
562 return dictGenHashFunction((unsigned char*)buf, len);
563 } else {
564 uint64_t hash;
565
566 o = getDecodedObject(o);
567 hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
568 decrRefCount(o);
569 return hash;
570 }
571 }
572 }
573
574 /* Generic hash table type where keys are Redis Objects, Values
575 * dummy pointers. */
576 dictType objectKeyPointerValueDictType = {
577 dictEncObjHash, /* hash function */
578 NULL, /* key dup */
579 NULL, /* val dup */
580 dictEncObjKeyCompare, /* key compare */
581 dictObjectDestructor, /* key destructor */
582 NULL /* val destructor */
583 };
584
585 /* Like objectKeyPointerValueDictType(), but values can be destroyed, if
586 * not NULL, calling zfree(). */
587 dictType objectKeyHeapPointerValueDictType = {
588 dictEncObjHash, /* hash function */
589 NULL, /* key dup */
590 NULL, /* val dup */
591 dictEncObjKeyCompare, /* key compare */
592 dictObjectDestructor, /* key destructor */
593 dictVanillaFree /* val destructor */
594 };
595
596 /* Set dictionary type. Keys are SDS strings, values are ot used. */
597 dictType setDictType = {
598 dictSdsHash, /* hash function */
599 NULL, /* key dup */
600 NULL, /* val dup */
601 dictSdsKeyCompare, /* key compare */
602 dictSdsDestructor, /* key destructor */
603 NULL /* val destructor */
604 };
605
606 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
607 dictType zsetDictType = {
608 dictSdsHash, /* hash function */
609 NULL, /* key dup */
610 NULL, /* val dup */
611 dictSdsKeyCompare, /* key compare */
612 NULL, /* Note: SDS string shared & freed by skiplist */
613 NULL /* val destructor */
614 };
615
616 /* Db->dict, keys are sds strings, vals are Redis objects. */
617 dictType dbDictType = {
618 dictSdsHash, /* hash function */
619 NULL, /* key dup */
620 NULL, /* val dup */
621 dictSdsKeyCompare, /* key compare */
622 dictSdsDestructor, /* key destructor */
623 dictObjectDestructor /* val destructor */
624 };
625
626 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
627 dictType shaScriptObjectDictType = {
628 dictSdsCaseHash, /* hash function */
629 NULL, /* key dup */
630 NULL, /* val dup */
631 dictSdsKeyCaseCompare, /* key compare */
632 dictSdsDestructor, /* key destructor */
633 dictObjectDestructor /* val destructor */
634 };
635
636 /* Db->expires */
637 dictType keyptrDictType = {
638 dictSdsHash, /* hash function */
639 NULL, /* key dup */
640 NULL, /* val dup */
641 dictSdsKeyCompare, /* key compare */
642 NULL, /* key destructor */
643 NULL /* val destructor */
644 };
645
646 /* Command table. sds string -> command struct pointer. */
647 dictType commandTableDictType = {
648 dictSdsCaseHash, /* hash function */
649 NULL, /* key dup */
650 NULL, /* val dup */
651 dictSdsKeyCaseCompare, /* key compare */
652 dictSdsDestructor, /* key destructor */
653 NULL /* val destructor */
654 };
655
656 /* Hash type hash table (note that small hashes are represented with ziplists) */
657 dictType hashDictType = {
658 dictSdsHash, /* hash function */
659 NULL, /* key dup */
660 NULL, /* val dup */
661 dictSdsKeyCompare, /* key compare */
662 dictSdsDestructor, /* key destructor */
663 dictSdsDestructor /* val destructor */
664 };
665
666 /* Keylist hash table type has unencoded redis objects as keys and
667 * lists as values. It's used for blocking operations (BLPOP) and to
668 * map swapped keys to a list of clients waiting for this keys to be loaded. */
669 dictType keylistDictType = {
670 dictObjHash, /* hash function */
671 NULL, /* key dup */
672 NULL, /* val dup */
673 dictObjKeyCompare, /* key compare */
674 dictObjectDestructor, /* key destructor */
675 dictListDestructor /* val destructor */
676 };
677
678 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
679 * clusterNode structures. */
680 dictType clusterNodesDictType = {
681 dictSdsHash, /* hash function */
682 NULL, /* key dup */
683 NULL, /* val dup */
684 dictSdsKeyCompare, /* key compare */
685 dictSdsDestructor, /* key destructor */
686 NULL /* val destructor */
687 };
688
689 /* Cluster re-addition blacklist. This maps node IDs to the time
690 * we can re-add this node. The goal is to avoid readding a removed
691 * node for some time. */
692 dictType clusterNodesBlackListDictType = {
693 dictSdsCaseHash, /* hash function */
694 NULL, /* key dup */
695 NULL, /* val dup */
696 dictSdsKeyCaseCompare, /* key compare */
697 dictSdsDestructor, /* key destructor */
698 NULL /* val destructor */
699 };
700
701 /* Cluster re-addition blacklist. This maps node IDs to the time
702 * we can re-add this node. The goal is to avoid readding a removed
703 * node for some time. */
704 dictType modulesDictType = {
705 dictSdsCaseHash, /* hash function */
706 NULL, /* key dup */
707 NULL, /* val dup */
708 dictSdsKeyCaseCompare, /* key compare */
709 dictSdsDestructor, /* key destructor */
710 NULL /* val destructor */
711 };
712
713 /* Migrate cache dict type. */
714 dictType migrateCacheDictType = {
715 dictSdsHash, /* hash function */
716 NULL, /* key dup */
717 NULL, /* val dup */
718 dictSdsKeyCompare, /* key compare */
719 dictSdsDestructor, /* key destructor */
720 NULL /* val destructor */
721 };
722
723 /* Replication cached script dict (server.repl_scriptcache_dict).
724 * Keys are sds SHA1 strings, while values are not used at all in the current
725 * implementation. */
726 dictType replScriptCacheDictType = {
727 dictSdsCaseHash, /* hash function */
728 NULL, /* key dup */
729 NULL, /* val dup */
730 dictSdsKeyCaseCompare, /* key compare */
731 dictSdsDestructor, /* key destructor */
732 NULL /* val destructor */
733 };
734
htNeedsResize(dict * dict)735 int htNeedsResize(dict *dict) {
736 long long size, used;
737
738 size = dictSlots(dict);
739 used = dictSize(dict);
740 return (size > DICT_HT_INITIAL_SIZE &&
741 (used*100/size < HASHTABLE_MIN_FILL));
742 }
743
744 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
745 * we resize the hash table to save memory */
tryResizeHashTables(int dbid)746 void tryResizeHashTables(int dbid) {
747 if (htNeedsResize(server.db[dbid].dict))
748 dictResize(server.db[dbid].dict);
749 if (htNeedsResize(server.db[dbid].expires))
750 dictResize(server.db[dbid].expires);
751 }
752
753 /* Our hash table implementation performs rehashing incrementally while
754 * we write/read from the hash table. Still if the server is idle, the hash
755 * table will use two tables for a long time. So we try to use 1 millisecond
756 * of CPU time at every call of this function to perform some rehahsing.
757 *
758 * The function returns 1 if some rehashing was performed, otherwise 0
759 * is returned. */
incrementallyRehash(int dbid)760 int incrementallyRehash(int dbid) {
761 /* Keys dictionary */
762 if (dictIsRehashing(server.db[dbid].dict)) {
763 dictRehashMilliseconds(server.db[dbid].dict,1);
764 return 1; /* already used our millisecond for this loop... */
765 }
766 /* Expires */
767 if (dictIsRehashing(server.db[dbid].expires)) {
768 dictRehashMilliseconds(server.db[dbid].expires,1);
769 return 1; /* already used our millisecond for this loop... */
770 }
771 return 0;
772 }
773
774 /* This function is called once a background process of some kind terminates,
775 * as we want to avoid resizing the hash tables when there is a child in order
776 * to play well with copy-on-write (otherwise when a resize happens lots of
777 * memory pages are copied). The goal of this function is to update the ability
778 * for dict.c to resize the hash tables accordingly to the fact we have o not
779 * running childs. */
updateDictResizePolicy(void)780 void updateDictResizePolicy(void) {
781 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
782 dictEnableResize();
783 else
784 dictDisableResize();
785 }
786
787 /* ======================= Cron: called every 100 ms ======================== */
788
789 /* Add a sample to the operations per second array of samples. */
trackInstantaneousMetric(int metric,long long current_reading)790 void trackInstantaneousMetric(int metric, long long current_reading) {
791 long long t = mstime() - server.inst_metric[metric].last_sample_time;
792 long long ops = current_reading -
793 server.inst_metric[metric].last_sample_count;
794 long long ops_sec;
795
796 ops_sec = t > 0 ? (ops*1000/t) : 0;
797
798 server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
799 ops_sec;
800 server.inst_metric[metric].idx++;
801 server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
802 server.inst_metric[metric].last_sample_time = mstime();
803 server.inst_metric[metric].last_sample_count = current_reading;
804 }
805
806 /* Return the mean of all the samples. */
getInstantaneousMetric(int metric)807 long long getInstantaneousMetric(int metric) {
808 int j;
809 long long sum = 0;
810
811 for (j = 0; j < STATS_METRIC_SAMPLES; j++)
812 sum += server.inst_metric[metric].samples[j];
813 return sum / STATS_METRIC_SAMPLES;
814 }
815
816 /* Check for timeouts. Returns non-zero if the client was terminated.
817 * The function gets the current time in milliseconds as argument since
818 * it gets called multiple times in a loop, so calling gettimeofday() for
819 * each iteration would be costly without any actual gain. */
clientsCronHandleTimeout(client * c,mstime_t now_ms)820 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
821 time_t now = now_ms/1000;
822
823 if (server.maxidletime &&
824 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
825 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */
826 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
827 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
828 (now - c->lastinteraction > server.maxidletime))
829 {
830 serverLog(LL_VERBOSE,"Closing idle client");
831 freeClient(c);
832 return 1;
833 } else if (c->flags & CLIENT_BLOCKED) {
834 /* Blocked OPS timeout is handled with milliseconds resolution.
835 * However note that the actual resolution is limited by
836 * server.hz. */
837
838 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
839 /* Handle blocking operation specific timeout. */
840 replyToBlockedClientTimedOut(c);
841 unblockClient(c);
842 } else if (server.cluster_enabled) {
843 /* Cluster: handle unblock & redirect of clients blocked
844 * into keys no longer served by this server. */
845 if (clusterRedirectBlockedClientIfNeeded(c))
846 unblockClient(c);
847 }
848 }
849 return 0;
850 }
851
852 /* The client query buffer is an sds.c string that can end with a lot of
853 * free space not used, this function reclaims space if needed.
854 *
855 * The function always returns 0 as it never terminates the client. */
clientsCronResizeQueryBuffer(client * c)856 int clientsCronResizeQueryBuffer(client *c) {
857 size_t querybuf_size = sdsAllocSize(c->querybuf);
858 time_t idletime = server.unixtime - c->lastinteraction;
859
860 /* There are two conditions to resize the query buffer:
861 * 1) Query buffer is > BIG_ARG and too big for latest peak.
862 * 2) Query buffer is > BIG_ARG and client is idle. */
863 if (querybuf_size > PROTO_MBULK_BIG_ARG &&
864 ((querybuf_size/(c->querybuf_peak+1)) > 2 ||
865 idletime > 2))
866 {
867 /* Only resize the query buffer if it is actually wasting
868 * at least a few kbytes. */
869 if (sdsavail(c->querybuf) > 1024*4) {
870 c->querybuf = sdsRemoveFreeSpace(c->querybuf);
871 }
872 }
873 /* Reset the peak again to capture the peak memory usage in the next
874 * cycle. */
875 c->querybuf_peak = 0;
876
877 /* Clients representing masters also use a "pending query buffer" that
878 * is the yet not applied part of the stream we are reading. Such buffer
879 * also needs resizing from time to time, otherwise after a very large
880 * transfer (a huge value or a big MIGRATE operation) it will keep using
881 * a lot of memory. */
882 if (c->flags & CLIENT_MASTER) {
883 /* There are two conditions to resize the pending query buffer:
884 * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
885 * 2) Used length is smaller than pending_querybuf_size/2 */
886 size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
887 if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
888 sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
889 {
890 c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
891 }
892 }
893 return 0;
894 }
895
896 /* This function is used in order to track clients using the biggest amount
897 * of memory in the latest few seconds. This way we can provide such information
898 * in the INFO output (clients section), without having to do an O(N) scan for
899 * all the clients.
900 *
901 * This is how it works. We have an array of CLIENTS_PEAK_MEM_USAGE_SLOTS slots
902 * where we track, for each, the biggest client output and input buffers we
903 * saw in that slot. Every slot correspond to one of the latest seconds, since
904 * the array is indexed by doing UNIXTIME % CLIENTS_PEAK_MEM_USAGE_SLOTS.
905 *
906 * When we want to know what was recently the peak memory usage, we just scan
907 * such few slots searching for the maximum value. */
908 #define CLIENTS_PEAK_MEM_USAGE_SLOTS 8
909 size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
910 size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS];
911
clientsCronTrackExpansiveClients(client * c)912 int clientsCronTrackExpansiveClients(client *c) {
913 size_t in_usage = sdsAllocSize(c->querybuf);
914 size_t out_usage = getClientOutputBufferMemoryUsage(c);
915 int i = server.unixtime % CLIENTS_PEAK_MEM_USAGE_SLOTS;
916 int zeroidx = (i+1) % CLIENTS_PEAK_MEM_USAGE_SLOTS;
917
918 /* Always zero the next sample, so that when we switch to that second, we'll
919 * only register samples that are greater in that second without considering
920 * the history of such slot.
921 *
922 * Note: our index may jump to any random position if serverCron() is not
923 * called for some reason with the normal frequency, for instance because
924 * some slow command is called taking multiple seconds to execute. In that
925 * case our array may end containing data which is potentially older
926 * than CLIENTS_PEAK_MEM_USAGE_SLOTS seconds: however this is not a problem
927 * since here we want just to track if "recently" there were very expansive
928 * clients from the POV of memory usage. */
929 ClientsPeakMemInput[zeroidx] = 0;
930 ClientsPeakMemOutput[zeroidx] = 0;
931
932 /* Track the biggest values observed so far in this slot. */
933 if (in_usage > ClientsPeakMemInput[i]) ClientsPeakMemInput[i] = in_usage;
934 if (out_usage > ClientsPeakMemOutput[i]) ClientsPeakMemOutput[i] = out_usage;
935
936 return 0; /* This function never terminates the client. */
937 }
938
939 /* Return the max samples in the memory usage of clients tracked by
940 * the function clientsCronTrackExpansiveClients(). */
getExpansiveClientsInfo(size_t * in_usage,size_t * out_usage)941 void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
942 size_t i = 0, o = 0;
943 for (int j = 0; j < CLIENTS_PEAK_MEM_USAGE_SLOTS; j++) {
944 if (ClientsPeakMemInput[j] > i) i = ClientsPeakMemInput[j];
945 if (ClientsPeakMemOutput[j] > o) o = ClientsPeakMemOutput[j];
946 }
947 *in_usage = i;
948 *out_usage = o;
949 }
950
951 /* This function is called by serverCron() and is used in order to perform
952 * operations on clients that are important to perform constantly. For instance
953 * we use this function in order to disconnect clients after a timeout, including
954 * clients blocked in some blocking command with a non-zero timeout.
955 *
956 * The function makes some effort to process all the clients every second, even
957 * if this cannot be strictly guaranteed, since serverCron() may be called with
958 * an actual frequency lower than server.hz in case of latency events like slow
959 * commands.
960 *
961 * It is very important for this function, and the functions it calls, to be
962 * very fast: sometimes Redis has tens of hundreds of connected clients, and the
963 * default server.hz value is 10, so sometimes here we need to process thousands
964 * of clients per second, turning this function into a source of latency.
965 */
966 #define CLIENTS_CRON_MIN_ITERATIONS 5
clientsCron(void)967 void clientsCron(void) {
968 /* Try to process at least numclients/server.hz of clients
969 * per call. Since normally (if there are no big latency events) this
970 * function is called server.hz times per second, in the average case we
971 * process all the clients in 1 second. */
972 int numclients = listLength(server.clients);
973 int iterations = numclients/server.hz;
974 mstime_t now = mstime();
975
976 /* Process at least a few clients while we are at it, even if we need
977 * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
978 * of processing each client once per second. */
979 if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
980 iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
981 numclients : CLIENTS_CRON_MIN_ITERATIONS;
982
983 while(listLength(server.clients) && iterations--) {
984 client *c;
985 listNode *head;
986
987 /* Rotate the list, take the current head, process.
988 * This way if the client must be removed from the list it's the
989 * first element and we don't incur into O(N) computation. */
990 listRotate(server.clients);
991 head = listFirst(server.clients);
992 c = listNodeValue(head);
993 /* The following functions do different service checks on the client.
994 * The protocol is that they return non-zero if the client was
995 * terminated. */
996 if (clientsCronHandleTimeout(c,now)) continue;
997 if (clientsCronResizeQueryBuffer(c)) continue;
998 if (clientsCronTrackExpansiveClients(c)) continue;
999 }
1000 }
1001
1002 /* This function handles 'background' operations we are required to do
1003 * incrementally in Redis databases, such as active key expiring, resizing,
1004 * rehashing. */
databasesCron(void)1005 void databasesCron(void) {
1006 /* Expire keys by random sampling. Not required for slaves
1007 * as master will synthesize DELs for us. */
1008 if (server.active_expire_enabled) {
1009 if (server.masterhost == NULL) {
1010 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1011 } else {
1012 expireSlaveKeys();
1013 }
1014 }
1015
1016 /* Defrag keys gradually. */
1017 if (server.active_defrag_enabled)
1018 activeDefragCycle();
1019
1020 /* Perform hash tables rehashing if needed, but only if there are no
1021 * other processes saving the DB on disk. Otherwise rehashing is bad
1022 * as will cause a lot of copy-on-write of memory pages. */
1023 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1024 /* We use global counters so if we stop the computation at a given
1025 * DB we'll be able to start from the successive in the next
1026 * cron loop iteration. */
1027 static unsigned int resize_db = 0;
1028 static unsigned int rehash_db = 0;
1029 int dbs_per_call = CRON_DBS_PER_CALL;
1030 int j;
1031
1032 /* Don't test more DBs than we have. */
1033 if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1034
1035 /* Resize */
1036 for (j = 0; j < dbs_per_call; j++) {
1037 tryResizeHashTables(resize_db % server.dbnum);
1038 resize_db++;
1039 }
1040
1041 /* Rehash */
1042 if (server.activerehashing) {
1043 for (j = 0; j < dbs_per_call; j++) {
1044 int work_done = incrementallyRehash(rehash_db);
1045 if (work_done) {
1046 /* If the function did some work, stop here, we'll do
1047 * more at the next cron loop. */
1048 break;
1049 } else {
1050 /* If this db didn't need rehash, we'll try the next one. */
1051 rehash_db++;
1052 rehash_db %= server.dbnum;
1053 }
1054 }
1055 }
1056 }
1057 }
1058
1059 /* We take a cached value of the unix time in the global state because with
1060 * virtual memory and aging there is to store the current time in objects at
1061 * every object access, and accuracy is not needed. To access a global var is
1062 * a lot faster than calling time(NULL) */
updateCachedTime(void)1063 void updateCachedTime(void) {
1064 time_t unixtime = time(NULL);
1065 atomicSet(server.unixtime,unixtime);
1066 server.mstime = mstime();
1067
1068 /* To get information about daylight saving time, we need to call localtime_r
1069 * and cache the result. However calling localtime_r in this context is safe
1070 * since we will never fork() while here, in the main thread. The logging
1071 * function will call a thread safe version of localtime that has no locks. */
1072 struct tm tm;
1073 localtime_r(&server.unixtime,&tm);
1074 server.daylight_active = tm.tm_isdst;
1075 }
1076
1077 /* This is our timer interrupt, called server.hz times per second.
1078 * Here is where we do a number of things that need to be done asynchronously.
1079 * For instance:
1080 *
1081 * - Active expired keys collection (it is also performed in a lazy way on
1082 * lookup).
1083 * - Software watchdog.
1084 * - Update some statistic.
1085 * - Incremental rehashing of the DBs hash tables.
1086 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1087 * - Clients timeout of different kinds.
1088 * - Replication reconnection.
1089 * - Many more...
1090 *
1091 * Everything directly called here will be called server.hz times per second,
1092 * so in order to throttle execution of things we want to do less frequently
1093 * a macro is used: run_with_period(milliseconds) { .... }
1094 */
1095
serverCron(struct aeEventLoop * eventLoop,long long id,void * clientData)1096 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1097 int j;
1098 UNUSED(eventLoop);
1099 UNUSED(id);
1100 UNUSED(clientData);
1101
1102 /* Software watchdog: deliver the SIGALRM that will reach the signal
1103 * handler if we don't return here fast enough. */
1104 if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1105
1106 /* Update the time cache. */
1107 updateCachedTime();
1108
1109 server.hz = server.config_hz;
1110 /* Adapt the server.hz value to the number of configured clients. If we have
1111 * many clients, we want to call serverCron() with an higher frequency. */
1112 if (server.dynamic_hz) {
1113 while (listLength(server.clients) / server.hz >
1114 MAX_CLIENTS_PER_CLOCK_TICK)
1115 {
1116 server.hz *= 2;
1117 if (server.hz > CONFIG_MAX_HZ) {
1118 server.hz = CONFIG_MAX_HZ;
1119 break;
1120 }
1121 }
1122 }
1123
1124 run_with_period(100) {
1125 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1126 trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1127 server.stat_net_input_bytes);
1128 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1129 server.stat_net_output_bytes);
1130 }
1131
1132 /* We have just LRU_BITS bits per object for LRU information.
1133 * So we use an (eventually wrapping) LRU clock.
1134 *
1135 * Note that even if the counter wraps it's not a big problem,
1136 * everything will still work but some object will appear younger
1137 * to Redis. However for this to happen a given object should never be
1138 * touched for all the time needed to the counter to wrap, which is
1139 * not likely.
1140 *
1141 * Note that you can change the resolution altering the
1142 * LRU_CLOCK_RESOLUTION define. */
1143 unsigned long lruclock = getLRUClock();
1144 atomicSet(server.lruclock,lruclock);
1145
1146 /* Record the max memory used since the server was started. */
1147 if (zmalloc_used_memory() > server.stat_peak_memory)
1148 server.stat_peak_memory = zmalloc_used_memory();
1149
1150 run_with_period(100) {
1151 /* Sample the RSS and other metrics here since this is a relatively slow call.
1152 * We must sample the zmalloc_used at the same time we take the rss, otherwise
1153 * the frag ratio calculate may be off (ratio of two samples at different times) */
1154 server.cron_malloc_stats.process_rss = zmalloc_get_rss();
1155 server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
1156 /* Sampling the allcator info can be slow too.
1157 * The fragmentation ratio it'll show is potentically more accurate
1158 * it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc
1159 * allocations, and allocator reserved pages that can be pursed (all not actual frag) */
1160 zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
1161 &server.cron_malloc_stats.allocator_active,
1162 &server.cron_malloc_stats.allocator_resident);
1163 /* in case the allocator isn't providing these stats, fake them so that
1164 * fragmention info still shows some (inaccurate metrics) */
1165 if (!server.cron_malloc_stats.allocator_resident) {
1166 /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS,
1167 * so we must desuct it in order to be able to calculate correct
1168 * "allocator fragmentation" ratio */
1169 size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
1170 server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
1171 }
1172 if (!server.cron_malloc_stats.allocator_active)
1173 server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
1174 if (!server.cron_malloc_stats.allocator_allocated)
1175 server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
1176 }
1177
1178 /* We received a SIGTERM, shutting down here in a safe way, as it is
1179 * not ok doing so inside the signal handler. */
1180 if (server.shutdown_asap) {
1181 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1182 serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1183 server.shutdown_asap = 0;
1184 }
1185
1186 /* Show some info about non-empty databases */
1187 run_with_period(5000) {
1188 for (j = 0; j < server.dbnum; j++) {
1189 long long size, used, vkeys;
1190
1191 size = dictSlots(server.db[j].dict);
1192 used = dictSize(server.db[j].dict);
1193 vkeys = dictSize(server.db[j].expires);
1194 if (used || vkeys) {
1195 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1196 /* dictPrintStats(server.dict); */
1197 }
1198 }
1199 }
1200
1201 /* Show information about connected clients */
1202 if (!server.sentinel_mode) {
1203 run_with_period(5000) {
1204 serverLog(LL_VERBOSE,
1205 "%lu clients connected (%lu replicas), %zu bytes in use",
1206 listLength(server.clients)-listLength(server.slaves),
1207 listLength(server.slaves),
1208 zmalloc_used_memory());
1209 }
1210 }
1211
1212 /* We need to do a few operations on clients asynchronously. */
1213 clientsCron();
1214
1215 /* Handle background operations on Redis databases. */
1216 databasesCron();
1217
1218 /* Start a scheduled AOF rewrite if this was requested by the user while
1219 * a BGSAVE was in progress. */
1220 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1221 server.aof_rewrite_scheduled)
1222 {
1223 rewriteAppendOnlyFileBackground();
1224 }
1225
1226 /* Check if a background saving or AOF rewrite in progress terminated. */
1227 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1228 ldbPendingChildren())
1229 {
1230 int statloc;
1231 pid_t pid;
1232
1233 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1234 int exitcode = WEXITSTATUS(statloc);
1235 int bysignal = 0;
1236
1237 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1238
1239 if (pid == -1) {
1240 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1241 "rdb_child_pid = %d, aof_child_pid = %d",
1242 strerror(errno),
1243 (int) server.rdb_child_pid,
1244 (int) server.aof_child_pid);
1245 } else if (pid == server.rdb_child_pid) {
1246 backgroundSaveDoneHandler(exitcode,bysignal);
1247 if (!bysignal && exitcode == 0) receiveChildInfo();
1248 } else if (pid == server.aof_child_pid) {
1249 backgroundRewriteDoneHandler(exitcode,bysignal);
1250 if (!bysignal && exitcode == 0) receiveChildInfo();
1251 } else {
1252 if (!ldbRemoveChild(pid)) {
1253 serverLog(LL_WARNING,
1254 "Warning, detected child with unmatched pid: %ld",
1255 (long)pid);
1256 }
1257 }
1258 updateDictResizePolicy();
1259 closeChildInfoPipe();
1260 }
1261 } else {
1262 /* If there is not a background saving/rewrite in progress check if
1263 * we have to save/rewrite now. */
1264 for (j = 0; j < server.saveparamslen; j++) {
1265 struct saveparam *sp = server.saveparams+j;
1266
1267 /* Save if we reached the given amount of changes,
1268 * the given amount of seconds, and if the latest bgsave was
1269 * successful or if, in case of an error, at least
1270 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1271 if (server.dirty >= sp->changes &&
1272 server.unixtime-server.lastsave > sp->seconds &&
1273 (server.unixtime-server.lastbgsave_try >
1274 CONFIG_BGSAVE_RETRY_DELAY ||
1275 server.lastbgsave_status == C_OK))
1276 {
1277 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1278 sp->changes, (int)sp->seconds);
1279 rdbSaveInfo rsi, *rsiptr;
1280 rsiptr = rdbPopulateSaveInfo(&rsi);
1281 rdbSaveBackground(server.rdb_filename,rsiptr);
1282 break;
1283 }
1284 }
1285
1286 /* Trigger an AOF rewrite if needed. */
1287 if (server.aof_state == AOF_ON &&
1288 server.rdb_child_pid == -1 &&
1289 server.aof_child_pid == -1 &&
1290 server.aof_rewrite_perc &&
1291 server.aof_current_size > server.aof_rewrite_min_size)
1292 {
1293 long long base = server.aof_rewrite_base_size ?
1294 server.aof_rewrite_base_size : 1;
1295 long long growth = (server.aof_current_size*100/base) - 100;
1296 if (growth >= server.aof_rewrite_perc) {
1297 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1298 rewriteAppendOnlyFileBackground();
1299 }
1300 }
1301 }
1302
1303
1304 /* AOF postponed flush: Try at every cron cycle if the slow fsync
1305 * completed. */
1306 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1307
1308 /* AOF write errors: in this case we have a buffer to flush as well and
1309 * clear the AOF error in case of success to make the DB writable again,
1310 * however to try every second is enough in case of 'hz' is set to
1311 * an higher frequency. */
1312 run_with_period(1000) {
1313 if (server.aof_last_write_status == C_ERR)
1314 flushAppendOnlyFile(0);
1315 }
1316
1317 /* Close clients that need to be closed asynchronous */
1318 freeClientsInAsyncFreeQueue();
1319
1320 /* Clear the paused clients flag if needed. */
1321 clientsArePaused(); /* Don't check return value, just use the side effect.*/
1322
1323 /* Replication cron function -- used to reconnect to master,
1324 * detect transfer failures, start background RDB transfers and so forth. */
1325 run_with_period(1000) replicationCron();
1326
1327 /* Run the Redis Cluster cron. */
1328 run_with_period(100) {
1329 if (server.cluster_enabled) clusterCron();
1330 }
1331
1332 /* Run the Sentinel timer if we are in sentinel mode. */
1333 if (server.sentinel_mode) sentinelTimer();
1334
1335 /* Cleanup expired MIGRATE cached sockets. */
1336 run_with_period(1000) {
1337 migrateCloseTimedoutSockets();
1338 }
1339
1340 /* Start a scheduled BGSAVE if the corresponding flag is set. This is
1341 * useful when we are forced to postpone a BGSAVE because an AOF
1342 * rewrite is in progress.
1343 *
1344 * Note: this code must be after the replicationCron() call above so
1345 * make sure when refactoring this file to keep this order. This is useful
1346 * because we want to give priority to RDB savings for replication. */
1347 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1348 server.rdb_bgsave_scheduled &&
1349 (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
1350 server.lastbgsave_status == C_OK))
1351 {
1352 rdbSaveInfo rsi, *rsiptr;
1353 rsiptr = rdbPopulateSaveInfo(&rsi);
1354 if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
1355 server.rdb_bgsave_scheduled = 0;
1356 }
1357
1358 server.cronloops++;
1359 return 1000/server.hz;
1360 }
1361
1362 /* This function gets called every time Redis is entering the
1363 * main loop of the event driven library, that is, before to sleep
1364 * for ready file descriptors. */
beforeSleep(struct aeEventLoop * eventLoop)1365 void beforeSleep(struct aeEventLoop *eventLoop) {
1366 UNUSED(eventLoop);
1367
1368 /* Call the Redis Cluster before sleep function. Note that this function
1369 * may change the state of Redis Cluster (from ok to fail or vice versa),
1370 * so it's a good idea to call it before serving the unblocked clients
1371 * later in this function. */
1372 if (server.cluster_enabled) clusterBeforeSleep();
1373
1374 /* Run a fast expire cycle (the called function will return
1375 * ASAP if a fast cycle is not needed). */
1376 if (server.active_expire_enabled && server.masterhost == NULL)
1377 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1378
1379 /* Send all the slaves an ACK request if at least one client blocked
1380 * during the previous event loop iteration. */
1381 if (server.get_ack_from_slaves) {
1382 robj *argv[3];
1383
1384 argv[0] = createStringObject("REPLCONF",8);
1385 argv[1] = createStringObject("GETACK",6);
1386 argv[2] = createStringObject("*",1); /* Not used argument. */
1387 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1388 decrRefCount(argv[0]);
1389 decrRefCount(argv[1]);
1390 decrRefCount(argv[2]);
1391 server.get_ack_from_slaves = 0;
1392 }
1393
1394 /* Unblock all the clients blocked for synchronous replication
1395 * in WAIT. */
1396 if (listLength(server.clients_waiting_acks))
1397 processClientsWaitingReplicas();
1398
1399 /* Check if there are clients unblocked by modules that implement
1400 * blocking commands. */
1401 moduleHandleBlockedClients();
1402
1403 /* Try to process pending commands for clients that were just unblocked. */
1404 if (listLength(server.unblocked_clients))
1405 processUnblockedClients();
1406
1407 /* Write the AOF buffer on disk */
1408 flushAppendOnlyFile(0);
1409
1410 /* Handle writes with pending output buffers. */
1411 handleClientsWithPendingWrites();
1412
1413 /* Before we are going to sleep, let the threads access the dataset by
1414 * releasing the GIL. Redis main thread will not touch anything at this
1415 * time. */
1416 if (moduleCount()) moduleReleaseGIL();
1417 }
1418
1419 /* This function is called immadiately after the event loop multiplexing
1420 * API returned, and the control is going to soon return to Redis by invoking
1421 * the different events callbacks. */
afterSleep(struct aeEventLoop * eventLoop)1422 void afterSleep(struct aeEventLoop *eventLoop) {
1423 UNUSED(eventLoop);
1424 if (moduleCount()) moduleAcquireGIL();
1425 }
1426
1427 /* =========================== Server initialization ======================== */
1428
createSharedObjects(void)1429 void createSharedObjects(void) {
1430 int j;
1431
1432 shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1433 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1434 shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1435 shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1436 shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1437 shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1438 shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1439 shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1440 shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1441 shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1442 shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1443 shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1444 shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1445 shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1446 "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1447 shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1448 "-ERR no such key\r\n"));
1449 shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1450 "-ERR syntax error\r\n"));
1451 shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1452 "-ERR source and destination objects are the same\r\n"));
1453 shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1454 "-ERR index out of range\r\n"));
1455 shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1456 "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1457 shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1458 "-LOADING Redis is loading the dataset in memory\r\n"));
1459 shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1460 "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1461 shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1462 "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
1463 shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1464 "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
1465 shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1466 "-READONLY You can't write against a read only replica.\r\n"));
1467 shared.noautherr = createObject(OBJ_STRING,sdsnew(
1468 "-NOAUTH Authentication required.\r\n"));
1469 shared.oomerr = createObject(OBJ_STRING,sdsnew(
1470 "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1471 shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1472 "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1473 shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1474 "-NOREPLICAS Not enough good replicas to write.\r\n"));
1475 shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1476 "-BUSYKEY Target key name already exists.\r\n"));
1477 shared.space = createObject(OBJ_STRING,sdsnew(" "));
1478 shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1479 shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1480
1481 for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1482 char dictid_str[64];
1483 int dictid_len;
1484
1485 dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1486 shared.select[j] = createObject(OBJ_STRING,
1487 sdscatprintf(sdsempty(),
1488 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1489 dictid_len, dictid_str));
1490 }
1491 shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1492 shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1493 shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1494 shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1495 shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1496 shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1497 shared.del = createStringObject("DEL",3);
1498 shared.unlink = createStringObject("UNLINK",6);
1499 shared.rpop = createStringObject("RPOP",4);
1500 shared.lpop = createStringObject("LPOP",4);
1501 shared.lpush = createStringObject("LPUSH",5);
1502 shared.rpoplpush = createStringObject("RPOPLPUSH",9);
1503 shared.zpopmin = createStringObject("ZPOPMIN",7);
1504 shared.zpopmax = createStringObject("ZPOPMAX",7);
1505 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1506 shared.integers[j] =
1507 makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
1508 shared.integers[j]->encoding = OBJ_ENCODING_INT;
1509 }
1510 for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1511 shared.mbulkhdr[j] = createObject(OBJ_STRING,
1512 sdscatprintf(sdsempty(),"*%d\r\n",j));
1513 shared.bulkhdr[j] = createObject(OBJ_STRING,
1514 sdscatprintf(sdsempty(),"$%d\r\n",j));
1515 }
1516 /* The following two shared objects, minstring and maxstrings, are not
1517 * actually used for their value but as a special object meaning
1518 * respectively the minimum possible string and the maximum possible
1519 * string in string comparisons for the ZRANGEBYLEX command. */
1520 shared.minstring = sdsnew("minstring");
1521 shared.maxstring = sdsnew("maxstring");
1522 }
1523
initServerConfig(void)1524 void initServerConfig(void) {
1525 int j;
1526
1527 pthread_mutex_init(&server.next_client_id_mutex,NULL);
1528 pthread_mutex_init(&server.lruclock_mutex,NULL);
1529 pthread_mutex_init(&server.unixtime_mutex,NULL);
1530
1531 updateCachedTime();
1532 getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1533 server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1534 changeReplicationId();
1535 clearReplicationId2();
1536 server.timezone = getTimeZone(); /* Initialized by tzset(). */
1537 server.configfile = NULL;
1538 server.executable = NULL;
1539 server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
1540 server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
1541 server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1542 server.port = CONFIG_DEFAULT_SERVER_PORT;
1543 server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1544 server.bindaddr_count = 0;
1545 server.unixsocket = NULL;
1546 server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1547 server.ipfd_count = 0;
1548 server.sofd = -1;
1549 server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
1550 server.dbnum = CONFIG_DEFAULT_DBNUM;
1551 server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1552 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1553 server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1554 server.active_expire_enabled = 1;
1555 server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG;
1556 server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES;
1557 server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER;
1558 server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER;
1559 server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN;
1560 server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX;
1561 server.active_defrag_max_scan_fields = CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS;
1562 server.proto_max_bulk_len = CONFIG_DEFAULT_PROTO_MAX_BULK_LEN;
1563 server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1564 server.saveparams = NULL;
1565 server.loading = 0;
1566 server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1567 server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1568 server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1569 server.syslog_facility = LOG_LOCAL0;
1570 server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1571 server.supervised = 0;
1572 server.supervised_mode = SUPERVISED_NONE;
1573 server.aof_state = AOF_OFF;
1574 server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1575 server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1576 server.aof_rewrite_perc = AOF_REWRITE_PERC;
1577 server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1578 server.aof_rewrite_base_size = 0;
1579 server.aof_rewrite_scheduled = 0;
1580 server.aof_last_fsync = time(NULL);
1581 server.aof_rewrite_time_last = -1;
1582 server.aof_rewrite_time_start = -1;
1583 server.aof_lastbgrewrite_status = C_OK;
1584 server.aof_delayed_fsync = 0;
1585 server.aof_fd = -1;
1586 server.aof_selected_db = -1; /* Make sure the first time will not match */
1587 server.aof_flush_postponed_start = 0;
1588 server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1589 server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC;
1590 server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1591 server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE;
1592 server.pidfile = NULL;
1593 server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1594 server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1595 server.requirepass = NULL;
1596 server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1597 server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1598 server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1599 server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1600 server.active_defrag_running = 0;
1601 server.notify_keyspace_events = 0;
1602 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1603 server.blocked_clients = 0;
1604 memset(server.blocked_clients_by_type,0,
1605 sizeof(server.blocked_clients_by_type));
1606 server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1607 server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1608 server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1609 server.lfu_log_factor = CONFIG_DEFAULT_LFU_LOG_FACTOR;
1610 server.lfu_decay_time = CONFIG_DEFAULT_LFU_DECAY_TIME;
1611 server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1612 server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1613 server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1614 server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1615 server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1616 server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1617 server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1618 server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1619 server.stream_node_max_bytes = OBJ_STREAM_NODE_MAX_BYTES;
1620 server.stream_node_max_entries = OBJ_STREAM_NODE_MAX_ENTRIES;
1621 server.shutdown_asap = 0;
1622 server.cluster_enabled = 0;
1623 server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1624 server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1625 server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1626 server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1627 server.cluster_slave_no_failover = CLUSTER_DEFAULT_SLAVE_NO_FAILOVER;
1628 server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1629 server.cluster_announce_ip = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_IP;
1630 server.cluster_announce_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT;
1631 server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT;
1632 server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
1633 server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1634 server.next_client_id = 1; /* Client IDs, start from 1 .*/
1635 server.loading_process_events_interval_bytes = (1024*1024*2);
1636 server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION;
1637 server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE;
1638 server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL;
1639 server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
1640 server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
1641
1642 unsigned int lruclock = getLRUClock();
1643 atomicSet(server.lruclock,lruclock);
1644 resetServerSaveParams();
1645
1646 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1647 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1648 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1649
1650 /* Replication related */
1651 server.masterauth = NULL;
1652 server.masterhost = NULL;
1653 server.masterport = 6379;
1654 server.master = NULL;
1655 server.cached_master = NULL;
1656 server.master_initial_offset = -1;
1657 server.repl_state = REPL_STATE_NONE;
1658 server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1659 server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1660 server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1661 server.repl_slave_ignore_maxmemory = CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY;
1662 server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH;
1663 server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1664 server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1665 server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1666 server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1667 server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1668 server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1669 server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1670 server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1671 server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1672 server.slave_announce_ip = CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP;
1673 server.slave_announce_port = CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT;
1674 server.master_repl_offset = 0;
1675
1676 /* Replication partial resync backlog */
1677 server.repl_backlog = NULL;
1678 server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1679 server.repl_backlog_histlen = 0;
1680 server.repl_backlog_idx = 0;
1681 server.repl_backlog_off = 0;
1682 server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1683 server.repl_no_slaves_since = time(NULL);
1684
1685 /* Client output buffer limits */
1686 for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1687 server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1688
1689 /* Double constants initialization */
1690 R_Zero = 0.0;
1691 R_PosInf = 1.0/R_Zero;
1692 R_NegInf = -1.0/R_Zero;
1693 R_Nan = R_Zero/R_Zero;
1694
1695 /* Command table -- we initiialize it here as it is part of the
1696 * initial configuration, since command names may be changed via
1697 * redis.conf using the rename-command directive. */
1698 server.commands = dictCreate(&commandTableDictType,NULL);
1699 server.orig_commands = dictCreate(&commandTableDictType,NULL);
1700 populateCommandTable();
1701 server.delCommand = lookupCommandByCString("del");
1702 server.multiCommand = lookupCommandByCString("multi");
1703 server.lpushCommand = lookupCommandByCString("lpush");
1704 server.lpopCommand = lookupCommandByCString("lpop");
1705 server.rpopCommand = lookupCommandByCString("rpop");
1706 server.zpopminCommand = lookupCommandByCString("zpopmin");
1707 server.zpopmaxCommand = lookupCommandByCString("zpopmax");
1708 server.sremCommand = lookupCommandByCString("srem");
1709 server.execCommand = lookupCommandByCString("exec");
1710 server.expireCommand = lookupCommandByCString("expire");
1711 server.pexpireCommand = lookupCommandByCString("pexpire");
1712 server.xclaimCommand = lookupCommandByCString("xclaim");
1713 server.xgroupCommand = lookupCommandByCString("xgroup");
1714
1715 /* Slow log */
1716 server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1717 server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1718
1719 /* Latency monitor */
1720 server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1721
1722 /* Debugging */
1723 server.assert_failed = "<no assertion failed>";
1724 server.assert_file = "<no file>";
1725 server.assert_line = 0;
1726 server.bug_report_start = 0;
1727 server.watchdog_period = 0;
1728
1729 /* By default we want scripts to be always replicated by effects
1730 * (single commands executed by the script), and not by sending the
1731 * script to the slave / AOF. This is the new way starting from
1732 * Redis 5. However it is possible to revert it via redis.conf. */
1733 server.lua_always_replicate_commands = 1;
1734 }
1735
1736 extern char **environ;
1737
1738 /* Restart the server, executing the same executable that started this
1739 * instance, with the same arguments and configuration file.
1740 *
1741 * The function is designed to directly call execve() so that the new
1742 * server instance will retain the PID of the previous one.
1743 *
1744 * The list of flags, that may be bitwise ORed together, alter the
1745 * behavior of this function:
1746 *
1747 * RESTART_SERVER_NONE No flags.
1748 * RESTART_SERVER_GRACEFULLY Do a proper shutdown before restarting.
1749 * RESTART_SERVER_CONFIG_REWRITE Rewrite the config file before restarting.
1750 *
1751 * On success the function does not return, because the process turns into
1752 * a different process. On error C_ERR is returned. */
restartServer(int flags,mstime_t delay)1753 int restartServer(int flags, mstime_t delay) {
1754 int j;
1755
1756 /* Check if we still have accesses to the executable that started this
1757 * server instance. */
1758 if (access(server.executable,X_OK) == -1) {
1759 serverLog(LL_WARNING,"Can't restart: this process has no "
1760 "permissions to execute %s", server.executable);
1761 return C_ERR;
1762 }
1763
1764 /* Config rewriting. */
1765 if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1766 server.configfile &&
1767 rewriteConfig(server.configfile) == -1)
1768 {
1769 serverLog(LL_WARNING,"Can't restart: configuration rewrite process "
1770 "failed");
1771 return C_ERR;
1772 }
1773
1774 /* Perform a proper shutdown. */
1775 if (flags & RESTART_SERVER_GRACEFULLY &&
1776 prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK)
1777 {
1778 serverLog(LL_WARNING,"Can't restart: error preparing for shutdown");
1779 return C_ERR;
1780 }
1781
1782 /* Close all file descriptors, with the exception of stdin, stdout, strerr
1783 * which are useful if we restart a Redis server which is not daemonized. */
1784 for (j = 3; j < (int)server.maxclients + 1024; j++) {
1785 /* Test the descriptor validity before closing it, otherwise
1786 * Valgrind issues a warning on close(). */
1787 if (fcntl(j,F_GETFD) != -1) close(j);
1788 }
1789
1790 /* Execute the server with the original command line. */
1791 if (delay) usleep(delay*1000);
1792 zfree(server.exec_argv[0]);
1793 server.exec_argv[0] = zstrdup(server.executable);
1794 execve(server.executable,server.exec_argv,environ);
1795
1796 /* If an error occurred here, there is nothing we can do, but exit. */
1797 _exit(1);
1798
1799 return C_ERR; /* Never reached. */
1800 }
1801
1802 /* This function will try to raise the max number of open files accordingly to
1803 * the configured max number of clients. It also reserves a number of file
1804 * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1805 * persistence, listening sockets, log files and so forth.
1806 *
1807 * If it will not be possible to set the limit accordingly to the configured
1808 * max number of clients, the function will do the reverse setting
1809 * server.maxclients to the value that we can actually handle. */
adjustOpenFilesLimit(void)1810 void adjustOpenFilesLimit(void) {
1811 rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1812 struct rlimit limit;
1813
1814 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1815 serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1816 strerror(errno));
1817 server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1818 } else {
1819 rlim_t oldlimit = limit.rlim_cur;
1820
1821 /* Set the max number of files if the current limit is not enough
1822 * for our needs. */
1823 if (oldlimit < maxfiles) {
1824 rlim_t bestlimit;
1825 int setrlimit_error = 0;
1826
1827 /* Try to set the file limit to match 'maxfiles' or at least
1828 * to the higher value supported less than maxfiles. */
1829 bestlimit = maxfiles;
1830 while(bestlimit > oldlimit) {
1831 rlim_t decr_step = 16;
1832
1833 limit.rlim_cur = bestlimit;
1834 limit.rlim_max = bestlimit;
1835 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1836 setrlimit_error = errno;
1837
1838 /* We failed to set file limit to 'bestlimit'. Try with a
1839 * smaller limit decrementing by a few FDs per iteration. */
1840 if (bestlimit < decr_step) break;
1841 bestlimit -= decr_step;
1842 }
1843
1844 /* Assume that the limit we get initially is still valid if
1845 * our last try was even lower. */
1846 if (bestlimit < oldlimit) bestlimit = oldlimit;
1847
1848 if (bestlimit < maxfiles) {
1849 unsigned int old_maxclients = server.maxclients;
1850 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1851 /* maxclients is unsigned so may overflow: in order
1852 * to check if maxclients is now logically less than 1
1853 * we test indirectly via bestlimit. */
1854 if (bestlimit <= CONFIG_MIN_RESERVED_FDS) {
1855 serverLog(LL_WARNING,"Your current 'ulimit -n' "
1856 "of %llu is not enough for the server to start. "
1857 "Please increase your open file limit to at least "
1858 "%llu. Exiting.",
1859 (unsigned long long) oldlimit,
1860 (unsigned long long) maxfiles);
1861 exit(1);
1862 }
1863 serverLog(LL_WARNING,"You requested maxclients of %d "
1864 "requiring at least %llu max file descriptors.",
1865 old_maxclients,
1866 (unsigned long long) maxfiles);
1867 serverLog(LL_WARNING,"Server can't set maximum open files "
1868 "to %llu because of OS error: %s.",
1869 (unsigned long long) maxfiles, strerror(setrlimit_error));
1870 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1871 "maxclients has been reduced to %d to compensate for "
1872 "low ulimit. "
1873 "If you need higher maxclients increase 'ulimit -n'.",
1874 (unsigned long long) bestlimit, server.maxclients);
1875 } else {
1876 serverLog(LL_NOTICE,"Increased maximum number of open files "
1877 "to %llu (it was originally set to %llu).",
1878 (unsigned long long) maxfiles,
1879 (unsigned long long) oldlimit);
1880 }
1881 }
1882 }
1883 }
1884
1885 /* Check that server.tcp_backlog can be actually enforced in Linux according
1886 * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
checkTcpBacklogSettings(void)1887 void checkTcpBacklogSettings(void) {
1888 #ifdef HAVE_PROC_SOMAXCONN
1889 FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1890 char buf[1024];
1891 if (!fp) return;
1892 if (fgets(buf,sizeof(buf),fp) != NULL) {
1893 int somaxconn = atoi(buf);
1894 if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1895 serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1896 }
1897 }
1898 fclose(fp);
1899 #endif
1900 }
1901
1902 /* Initialize a set of file descriptors to listen to the specified 'port'
1903 * binding the addresses specified in the Redis server configuration.
1904 *
1905 * The listening file descriptors are stored in the integer array 'fds'
1906 * and their number is set in '*count'.
1907 *
1908 * The addresses to bind are specified in the global server.bindaddr array
1909 * and their number is server.bindaddr_count. If the server configuration
1910 * contains no specific addresses to bind, this function will try to
1911 * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1912 *
1913 * On success the function returns C_OK.
1914 *
1915 * On error the function returns C_ERR. For the function to be on
1916 * error, at least one of the server.bindaddr addresses was
1917 * impossible to bind, or no bind addresses were specified in the server
1918 * configuration but the function is not able to bind * for at least
1919 * one of the IPv4 or IPv6 protocols. */
listenToPort(int port,int * fds,int * count)1920 int listenToPort(int port, int *fds, int *count) {
1921 int j;
1922
1923 /* Force binding of 0.0.0.0 if no bind address is specified, always
1924 * entering the loop if j == 0. */
1925 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1926 for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1927 if (server.bindaddr[j] == NULL) {
1928 int unsupported = 0;
1929 /* Bind * for both IPv6 and IPv4, we enter here only if
1930 * server.bindaddr_count == 0. */
1931 fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1932 server.tcp_backlog);
1933 if (fds[*count] != ANET_ERR) {
1934 anetNonBlock(NULL,fds[*count]);
1935 (*count)++;
1936 } else if (errno == EAFNOSUPPORT) {
1937 unsupported++;
1938 serverLog(LL_WARNING,"Not listening to IPv6: unsupproted");
1939 }
1940
1941 if (*count == 1 || unsupported) {
1942 /* Bind the IPv4 address as well. */
1943 fds[*count] = anetTcpServer(server.neterr,port,NULL,
1944 server.tcp_backlog);
1945 if (fds[*count] != ANET_ERR) {
1946 anetNonBlock(NULL,fds[*count]);
1947 (*count)++;
1948 } else if (errno == EAFNOSUPPORT) {
1949 unsupported++;
1950 serverLog(LL_WARNING,"Not listening to IPv4: unsupproted");
1951 }
1952 }
1953 /* Exit the loop if we were able to bind * on IPv4 and IPv6,
1954 * otherwise fds[*count] will be ANET_ERR and we'll print an
1955 * error and return to the caller with an error. */
1956 if (*count + unsupported == 2) break;
1957 } else if (strchr(server.bindaddr[j],':')) {
1958 /* Bind IPv6 address. */
1959 fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1960 server.tcp_backlog);
1961 } else {
1962 /* Bind IPv4 address. */
1963 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1964 server.tcp_backlog);
1965 }
1966 if (fds[*count] == ANET_ERR) {
1967 serverLog(LL_WARNING,
1968 "Could not create server TCP listening socket %s:%d: %s",
1969 server.bindaddr[j] ? server.bindaddr[j] : "*",
1970 port, server.neterr);
1971 if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT ||
1972 errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
1973 errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
1974 continue;
1975 return C_ERR;
1976 }
1977 anetNonBlock(NULL,fds[*count]);
1978 (*count)++;
1979 }
1980 return C_OK;
1981 }
1982
1983 /* Resets the stats that we expose via INFO or other means that we want
1984 * to reset via CONFIG RESETSTAT. The function is also used in order to
1985 * initialize these fields in initServer() at server startup. */
resetServerStats(void)1986 void resetServerStats(void) {
1987 int j;
1988
1989 server.stat_numcommands = 0;
1990 server.stat_numconnections = 0;
1991 server.stat_expiredkeys = 0;
1992 server.stat_expired_stale_perc = 0;
1993 server.stat_expired_time_cap_reached_count = 0;
1994 server.stat_evictedkeys = 0;
1995 server.stat_keyspace_misses = 0;
1996 server.stat_keyspace_hits = 0;
1997 server.stat_active_defrag_hits = 0;
1998 server.stat_active_defrag_misses = 0;
1999 server.stat_active_defrag_key_hits = 0;
2000 server.stat_active_defrag_key_misses = 0;
2001 server.stat_active_defrag_scanned = 0;
2002 server.stat_fork_time = 0;
2003 server.stat_fork_rate = 0;
2004 server.stat_rejected_conn = 0;
2005 server.stat_sync_full = 0;
2006 server.stat_sync_partial_ok = 0;
2007 server.stat_sync_partial_err = 0;
2008 for (j = 0; j < STATS_METRIC_COUNT; j++) {
2009 server.inst_metric[j].idx = 0;
2010 server.inst_metric[j].last_sample_time = mstime();
2011 server.inst_metric[j].last_sample_count = 0;
2012 memset(server.inst_metric[j].samples,0,
2013 sizeof(server.inst_metric[j].samples));
2014 }
2015 server.stat_net_input_bytes = 0;
2016 server.stat_net_output_bytes = 0;
2017 server.aof_delayed_fsync = 0;
2018 }
2019
initServer(void)2020 void initServer(void) {
2021 int j;
2022
2023 signal(SIGHUP, SIG_IGN);
2024 signal(SIGPIPE, SIG_IGN);
2025 setupSignalHandlers();
2026
2027 if (server.syslog_enabled) {
2028 openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
2029 server.syslog_facility);
2030 }
2031
2032 server.hz = server.config_hz;
2033 server.pid = getpid();
2034 server.current_client = NULL;
2035 server.clients = listCreate();
2036 server.clients_index = raxNew();
2037 server.clients_to_close = listCreate();
2038 server.slaves = listCreate();
2039 server.monitors = listCreate();
2040 server.clients_pending_write = listCreate();
2041 server.slaveseldb = -1; /* Force to emit the first SELECT command. */
2042 server.unblocked_clients = listCreate();
2043 server.ready_keys = listCreate();
2044 server.clients_waiting_acks = listCreate();
2045 server.get_ack_from_slaves = 0;
2046 server.clients_paused = 0;
2047 server.system_memory_size = zmalloc_get_memory_size();
2048
2049 createSharedObjects();
2050 adjustOpenFilesLimit();
2051 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
2052 if (server.el == NULL) {
2053 serverLog(LL_WARNING,
2054 "Failed creating the event loop. Error message: '%s'",
2055 strerror(errno));
2056 exit(1);
2057 }
2058 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
2059
2060 /* Open the TCP listening socket for the user commands. */
2061 if (server.port != 0 &&
2062 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
2063 exit(1);
2064
2065 /* Open the listening Unix domain socket. */
2066 if (server.unixsocket != NULL) {
2067 unlink(server.unixsocket); /* don't care if this fails */
2068 server.sofd = anetUnixServer(server.neterr,server.unixsocket,
2069 server.unixsocketperm, server.tcp_backlog);
2070 if (server.sofd == ANET_ERR) {
2071 serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
2072 exit(1);
2073 }
2074 anetNonBlock(NULL,server.sofd);
2075 }
2076
2077 /* Abort if there are no listening sockets at all. */
2078 if (server.ipfd_count == 0 && server.sofd < 0) {
2079 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
2080 exit(1);
2081 }
2082
2083 /* Create the Redis databases, and initialize other internal state. */
2084 for (j = 0; j < server.dbnum; j++) {
2085 server.db[j].dict = dictCreate(&dbDictType,NULL);
2086 server.db[j].expires = dictCreate(&keyptrDictType,NULL);
2087 server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
2088 server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
2089 server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
2090 server.db[j].id = j;
2091 server.db[j].avg_ttl = 0;
2092 server.db[j].defrag_later = listCreate();
2093 }
2094 evictionPoolAlloc(); /* Initialize the LRU keys pool. */
2095 server.pubsub_channels = dictCreate(&keylistDictType,NULL);
2096 server.pubsub_patterns = listCreate();
2097 listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
2098 listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
2099 server.cronloops = 0;
2100 server.rdb_child_pid = -1;
2101 server.aof_child_pid = -1;
2102 server.rdb_child_type = RDB_CHILD_TYPE_NONE;
2103 server.rdb_bgsave_scheduled = 0;
2104 server.child_info_pipe[0] = -1;
2105 server.child_info_pipe[1] = -1;
2106 server.child_info_data.magic = 0;
2107 aofRewriteBufferReset();
2108 server.aof_buf = sdsempty();
2109 server.lastsave = time(NULL); /* At startup we consider the DB saved. */
2110 server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
2111 server.rdb_save_time_last = -1;
2112 server.rdb_save_time_start = -1;
2113 server.dirty = 0;
2114 resetServerStats();
2115 /* A few stats we don't want to reset: server startup time, and peak mem. */
2116 server.stat_starttime = time(NULL);
2117 server.stat_peak_memory = 0;
2118 server.stat_rdb_cow_bytes = 0;
2119 server.stat_aof_cow_bytes = 0;
2120 server.cron_malloc_stats.zmalloc_used = 0;
2121 server.cron_malloc_stats.process_rss = 0;
2122 server.cron_malloc_stats.allocator_allocated = 0;
2123 server.cron_malloc_stats.allocator_active = 0;
2124 server.cron_malloc_stats.allocator_resident = 0;
2125 server.lastbgsave_status = C_OK;
2126 server.aof_last_write_status = C_OK;
2127 server.aof_last_write_errno = 0;
2128 server.repl_good_slaves_count = 0;
2129
2130 /* Create the timer callback, this is our way to process many background
2131 * operations incrementally, like clients timeout, eviction of unaccessed
2132 * expired keys and so forth. */
2133 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
2134 serverPanic("Can't create event loop timers.");
2135 exit(1);
2136 }
2137
2138 /* Create an event handler for accepting new connections in TCP and Unix
2139 * domain sockets. */
2140 for (j = 0; j < server.ipfd_count; j++) {
2141 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
2142 acceptTcpHandler,NULL) == AE_ERR)
2143 {
2144 serverPanic(
2145 "Unrecoverable error creating server.ipfd file event.");
2146 }
2147 }
2148 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
2149 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
2150
2151 #ifndef HAVE_FF_KQUEUE
2152 /* Register a readable event for the pipe used to awake the event loop
2153 * when a blocked client in a module needs attention. */
2154 if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
2155 moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
2156 serverPanic(
2157 "Error registering the readable event for the module "
2158 "blocked clients subsystem.");
2159 }
2160 #endif
2161
2162 /* Open the AOF file if needed. */
2163 if (server.aof_state == AOF_ON) {
2164 server.aof_fd = open(server.aof_filename,
2165 O_WRONLY|O_APPEND|O_CREAT,0644);
2166 if (server.aof_fd == -1) {
2167 serverLog(LL_WARNING, "Can't open the append-only file: %s",
2168 strerror(errno));
2169 exit(1);
2170 }
2171 }
2172
2173 /* 32 bit instances are limited to 4GB of address space, so if there is
2174 * no explicit limit in the user provided configuration we set a limit
2175 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
2176 * useless crashes of the Redis instance for out of memory. */
2177 if (server.arch_bits == 32 && server.maxmemory == 0) {
2178 serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
2179 server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
2180 server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
2181 }
2182
2183 if (server.cluster_enabled) clusterInit();
2184 replicationScriptCacheInit();
2185 scriptingInit(1);
2186 slowlogInit();
2187 latencyMonitorInit();
2188 bioInit();
2189 server.initial_memory_usage = zmalloc_used_memory();
2190 }
2191
2192 /* Populates the Redis Command Table starting from the hard coded list
2193 * we have on top of redis.c file. */
populateCommandTable(void)2194 void populateCommandTable(void) {
2195 int j;
2196 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2197
2198 for (j = 0; j < numcommands; j++) {
2199 struct redisCommand *c = redisCommandTable+j;
2200 char *f = c->sflags;
2201 int retval1, retval2;
2202
2203 while(*f != '\0') {
2204 switch(*f) {
2205 case 'w': c->flags |= CMD_WRITE; break;
2206 case 'r': c->flags |= CMD_READONLY; break;
2207 case 'm': c->flags |= CMD_DENYOOM; break;
2208 case 'a': c->flags |= CMD_ADMIN; break;
2209 case 'p': c->flags |= CMD_PUBSUB; break;
2210 case 's': c->flags |= CMD_NOSCRIPT; break;
2211 case 'R': c->flags |= CMD_RANDOM; break;
2212 case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
2213 case 'l': c->flags |= CMD_LOADING; break;
2214 case 't': c->flags |= CMD_STALE; break;
2215 case 'M': c->flags |= CMD_SKIP_MONITOR; break;
2216 case 'k': c->flags |= CMD_ASKING; break;
2217 case 'F': c->flags |= CMD_FAST; break;
2218 default: serverPanic("Unsupported command flag"); break;
2219 }
2220 f++;
2221 }
2222
2223 retval1 = dictAdd(server.commands, sdsnew(c->name), c);
2224 /* Populate an additional dictionary that will be unaffected
2225 * by rename-command statements in redis.conf. */
2226 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2227 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2228 }
2229 }
2230
resetCommandTableStats(void)2231 void resetCommandTableStats(void) {
2232 struct redisCommand *c;
2233 dictEntry *de;
2234 dictIterator *di;
2235
2236 di = dictGetSafeIterator(server.commands);
2237 while((de = dictNext(di)) != NULL) {
2238 c = (struct redisCommand *) dictGetVal(de);
2239 c->microseconds = 0;
2240 c->calls = 0;
2241 }
2242 dictReleaseIterator(di);
2243
2244 }
2245
2246 /* ========================== Redis OP Array API ============================ */
2247
redisOpArrayInit(redisOpArray * oa)2248 void redisOpArrayInit(redisOpArray *oa) {
2249 oa->ops = NULL;
2250 oa->numops = 0;
2251 }
2252
redisOpArrayAppend(redisOpArray * oa,struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2253 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2254 robj **argv, int argc, int target)
2255 {
2256 redisOp *op;
2257
2258 oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2259 op = oa->ops+oa->numops;
2260 op->cmd = cmd;
2261 op->dbid = dbid;
2262 op->argv = argv;
2263 op->argc = argc;
2264 op->target = target;
2265 oa->numops++;
2266 return oa->numops;
2267 }
2268
redisOpArrayFree(redisOpArray * oa)2269 void redisOpArrayFree(redisOpArray *oa) {
2270 while(oa->numops) {
2271 int j;
2272 redisOp *op;
2273
2274 oa->numops--;
2275 op = oa->ops+oa->numops;
2276 for (j = 0; j < op->argc; j++)
2277 decrRefCount(op->argv[j]);
2278 zfree(op->argv);
2279 }
2280 zfree(oa->ops);
2281 }
2282
2283 /* ====================== Commands lookup and execution ===================== */
2284
lookupCommand(sds name)2285 struct redisCommand *lookupCommand(sds name) {
2286 return dictFetchValue(server.commands, name);
2287 }
2288
lookupCommandByCString(char * s)2289 struct redisCommand *lookupCommandByCString(char *s) {
2290 struct redisCommand *cmd;
2291 sds name = sdsnew(s);
2292
2293 cmd = dictFetchValue(server.commands, name);
2294 sdsfree(name);
2295 return cmd;
2296 }
2297
2298 /* Lookup the command in the current table, if not found also check in
2299 * the original table containing the original command names unaffected by
2300 * redis.conf rename-command statement.
2301 *
2302 * This is used by functions rewriting the argument vector such as
2303 * rewriteClientCommandVector() in order to set client->cmd pointer
2304 * correctly even if the command was renamed. */
lookupCommandOrOriginal(sds name)2305 struct redisCommand *lookupCommandOrOriginal(sds name) {
2306 struct redisCommand *cmd = dictFetchValue(server.commands, name);
2307
2308 if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2309 return cmd;
2310 }
2311
2312 /* Propagate the specified command (in the context of the specified database id)
2313 * to AOF and Slaves.
2314 *
2315 * flags are an xor between:
2316 * + PROPAGATE_NONE (no propagation of command at all)
2317 * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2318 * + PROPAGATE_REPL (propagate into the replication link)
2319 *
2320 * This should not be used inside commands implementation. Use instead
2321 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2322 */
propagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int flags)2323 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2324 int flags)
2325 {
2326 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2327 feedAppendOnlyFile(cmd,dbid,argv,argc);
2328 if (flags & PROPAGATE_REPL)
2329 replicationFeedSlaves(server.slaves,dbid,argv,argc);
2330 }
2331
2332 /* Used inside commands to schedule the propagation of additional commands
2333 * after the current command is propagated to AOF / Replication.
2334 *
2335 * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2336 * database ID the command should be propagated into.
2337 * Arguments of the command to propagte are passed as an array of redis
2338 * objects pointers of len 'argc', using the 'argv' vector.
2339 *
2340 * The function does not take a reference to the passed 'argv' vector,
2341 * so it is up to the caller to release the passed argv (but it is usually
2342 * stack allocated). The function autoamtically increments ref count of
2343 * passed objects, so the caller does not need to. */
alsoPropagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2344 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2345 int target)
2346 {
2347 robj **argvcopy;
2348 int j;
2349
2350 if (server.loading) return; /* No propagation during loading. */
2351
2352 argvcopy = zmalloc(sizeof(robj*)*argc);
2353 for (j = 0; j < argc; j++) {
2354 argvcopy[j] = argv[j];
2355 incrRefCount(argv[j]);
2356 }
2357 redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2358 }
2359
2360 /* It is possible to call the function forceCommandPropagation() inside a
2361 * Redis command implementation in order to to force the propagation of a
2362 * specific command execution into AOF / Replication. */
forceCommandPropagation(client * c,int flags)2363 void forceCommandPropagation(client *c, int flags) {
2364 if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2365 if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2366 }
2367
2368 /* Avoid that the executed command is propagated at all. This way we
2369 * are free to just propagate what we want using the alsoPropagate()
2370 * API. */
preventCommandPropagation(client * c)2371 void preventCommandPropagation(client *c) {
2372 c->flags |= CLIENT_PREVENT_PROP;
2373 }
2374
2375 /* AOF specific version of preventCommandPropagation(). */
preventCommandAOF(client * c)2376 void preventCommandAOF(client *c) {
2377 c->flags |= CLIENT_PREVENT_AOF_PROP;
2378 }
2379
2380 /* Replication specific version of preventCommandPropagation(). */
preventCommandReplication(client * c)2381 void preventCommandReplication(client *c) {
2382 c->flags |= CLIENT_PREVENT_REPL_PROP;
2383 }
2384
2385 /* Call() is the core of Redis execution of a command.
2386 *
2387 * The following flags can be passed:
2388 * CMD_CALL_NONE No flags.
2389 * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.
2390 * CMD_CALL_STATS Populate command stats.
2391 * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
2392 * or if the client flags are forcing propagation.
2393 * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset
2394 * or if the client flags are forcing propagation.
2395 * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2396 * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.
2397 *
2398 * The exact propagation behavior depends on the client flags.
2399 * Specifically:
2400 *
2401 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2402 * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2403 * in the call flags, then the command is propagated even if the
2404 * dataset was not affected by the command.
2405 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2406 * are set, the propagation into AOF or to slaves is not performed even
2407 * if the command modified the dataset.
2408 *
2409 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2410 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2411 * slaves propagation will never occur.
2412 *
2413 * Client flags are modified by the implementation of a given command
2414 * using the following API:
2415 *
2416 * forceCommandPropagation(client *c, int flags);
2417 * preventCommandPropagation(client *c);
2418 * preventCommandAOF(client *c);
2419 * preventCommandReplication(client *c);
2420 *
2421 */
call(client * c,int flags)2422 void call(client *c, int flags) {
2423 long long dirty, start, duration;
2424 int client_old_flags = c->flags;
2425 struct redisCommand *real_cmd = c->cmd;
2426
2427 /* Sent the command to clients in MONITOR mode, only if the commands are
2428 * not generated from reading an AOF. */
2429 if (listLength(server.monitors) &&
2430 !server.loading &&
2431 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2432 {
2433 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2434 }
2435
2436 /* Initialization: clear the flags that must be set by the command on
2437 * demand, and initialize the array for additional commands propagation. */
2438 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2439 redisOpArray prev_also_propagate = server.also_propagate;
2440 redisOpArrayInit(&server.also_propagate);
2441
2442 /* Call the command. */
2443 dirty = server.dirty;
2444 start = ustime();
2445 c->cmd->proc(c);
2446 duration = ustime()-start;
2447 dirty = server.dirty-dirty;
2448 if (dirty < 0) dirty = 0;
2449
2450 /* When EVAL is called loading the AOF we don't want commands called
2451 * from Lua to go into the slowlog or to populate statistics. */
2452 if (server.loading && c->flags & CLIENT_LUA)
2453 flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2454
2455 /* If the caller is Lua, we want to force the EVAL caller to propagate
2456 * the script if the command flag or client flag are forcing the
2457 * propagation. */
2458 if (c->flags & CLIENT_LUA && server.lua_caller) {
2459 if (c->flags & CLIENT_FORCE_REPL)
2460 server.lua_caller->flags |= CLIENT_FORCE_REPL;
2461 if (c->flags & CLIENT_FORCE_AOF)
2462 server.lua_caller->flags |= CLIENT_FORCE_AOF;
2463 }
2464
2465 /* Log the command into the Slow log if needed, and populate the
2466 * per-command statistics that we show in INFO commandstats. */
2467 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2468 char *latency_event = (c->cmd->flags & CMD_FAST) ?
2469 "fast-command" : "command";
2470 latencyAddSampleIfNeeded(latency_event,duration/1000);
2471 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
2472 }
2473 if (flags & CMD_CALL_STATS) {
2474 /* use the real command that was executed (cmd and lastamc) may be
2475 * different, in case of MULTI-EXEC or re-written commands such as
2476 * EXPIRE, GEOADD, etc. */
2477 real_cmd->microseconds += duration;
2478 real_cmd->calls++;
2479 }
2480
2481 /* Propagate the command into the AOF and replication link */
2482 if (flags & CMD_CALL_PROPAGATE &&
2483 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2484 {
2485 int propagate_flags = PROPAGATE_NONE;
2486
2487 /* Check if the command operated changes in the data set. If so
2488 * set for replication / AOF propagation. */
2489 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2490
2491 /* If the client forced AOF / replication of the command, set
2492 * the flags regardless of the command effects on the data set. */
2493 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2494 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2495
2496 /* However prevent AOF / replication propagation if the command
2497 * implementations called preventCommandPropagation() or similar,
2498 * or if we don't have the call() flags to do so. */
2499 if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2500 !(flags & CMD_CALL_PROPAGATE_REPL))
2501 propagate_flags &= ~PROPAGATE_REPL;
2502 if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2503 !(flags & CMD_CALL_PROPAGATE_AOF))
2504 propagate_flags &= ~PROPAGATE_AOF;
2505
2506 /* Call propagate() only if at least one of AOF / replication
2507 * propagation is needed. Note that modules commands handle replication
2508 * in an explicit way, so we never replicate them automatically. */
2509 if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
2510 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2511 }
2512
2513 /* Restore the old replication flags, since call() can be executed
2514 * recursively. */
2515 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2516 c->flags |= client_old_flags &
2517 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2518
2519 /* Handle the alsoPropagate() API to handle commands that want to propagate
2520 * multiple separated commands. Note that alsoPropagate() is not affected
2521 * by CLIENT_PREVENT_PROP flag. */
2522 if (server.also_propagate.numops) {
2523 int j;
2524 redisOp *rop;
2525
2526 if (flags & CMD_CALL_PROPAGATE) {
2527 for (j = 0; j < server.also_propagate.numops; j++) {
2528 rop = &server.also_propagate.ops[j];
2529 int target = rop->target;
2530 /* Whatever the command wish is, we honor the call() flags. */
2531 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2532 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2533 if (target)
2534 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2535 }
2536 }
2537 redisOpArrayFree(&server.also_propagate);
2538 }
2539 server.also_propagate = prev_also_propagate;
2540 server.stat_numcommands++;
2541 }
2542
2543 /* If this function gets called we already read a whole
2544 * command, arguments are in the client argv/argc fields.
2545 * processCommand() execute the command or prepare the
2546 * server for a bulk read from the client.
2547 *
2548 * If C_OK is returned the client is still alive and valid and
2549 * other operations can be performed by the caller. Otherwise
2550 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
processCommand(client * c)2551 int processCommand(client *c) {
2552 moduleCallCommandFilters(c);
2553
2554 /* The QUIT command is handled separately. Normal command procs will
2555 * go through checking for replication and QUIT will cause trouble
2556 * when FORCE_REPLICATION is enabled and would be implemented in
2557 * a regular command proc. */
2558 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2559 addReply(c,shared.ok);
2560 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2561 return C_ERR;
2562 }
2563
2564 /* Now lookup the command and check ASAP about trivial error conditions
2565 * such as wrong arity, bad command name and so forth. */
2566 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2567 if (!c->cmd) {
2568 flagTransaction(c);
2569 sds args = sdsempty();
2570 int i;
2571 for (i=1; i < c->argc && sdslen(args) < 128; i++)
2572 args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
2573 addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
2574 (char*)c->argv[0]->ptr, args);
2575 sdsfree(args);
2576 return C_OK;
2577 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2578 (c->argc < -c->cmd->arity)) {
2579 flagTransaction(c);
2580 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2581 c->cmd->name);
2582 return C_OK;
2583 }
2584
2585 /* Check if the user is authenticated */
2586 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2587 {
2588 flagTransaction(c);
2589 addReply(c,shared.noautherr);
2590 return C_OK;
2591 }
2592
2593 /* If cluster is enabled perform the cluster redirection here.
2594 * However we don't perform the redirection if:
2595 * 1) The sender of this command is our master.
2596 * 2) The command has no key arguments. */
2597 if (server.cluster_enabled &&
2598 !(c->flags & CLIENT_MASTER) &&
2599 !(c->flags & CLIENT_LUA &&
2600 server.lua_caller->flags & CLIENT_MASTER) &&
2601 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2602 c->cmd->proc != execCommand))
2603 {
2604 int hashslot;
2605 int error_code;
2606 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2607 &hashslot,&error_code);
2608 if (n == NULL || n != server.cluster->myself) {
2609 if (c->cmd->proc == execCommand) {
2610 discardTransaction(c);
2611 } else {
2612 flagTransaction(c);
2613 }
2614 clusterRedirectClient(c,n,hashslot,error_code);
2615 return C_OK;
2616 }
2617 }
2618
2619 /* Handle the maxmemory directive.
2620 *
2621 * Note that we do not want to reclaim memory if we are here re-entering
2622 * the event loop since there is a busy Lua script running in timeout
2623 * condition, to avoid mixing the propagation of scripts with the
2624 * propagation of DELs due to eviction. */
2625 if (server.maxmemory && !server.lua_timedout) {
2626 int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
2627 /* freeMemoryIfNeeded may flush slave output buffers. This may result
2628 * into a slave, that may be the active client, to be freed. */
2629 if (server.current_client == NULL) return C_ERR;
2630
2631 /* It was impossible to free enough memory, and the command the client
2632 * is trying to execute is denied during OOM conditions or the client
2633 * is in MULTI/EXEC context? Error. */
2634 if (out_of_memory &&
2635 (c->cmd->flags & CMD_DENYOOM ||
2636 (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
2637 flagTransaction(c);
2638 addReply(c, shared.oomerr);
2639 return C_OK;
2640 }
2641 }
2642
2643 /* Don't accept write commands if there are problems persisting on disk
2644 * and if this is a master instance. */
2645 int deny_write_type = writeCommandsDeniedByDiskError();
2646 if (deny_write_type != DISK_ERROR_TYPE_NONE &&
2647 server.masterhost == NULL &&
2648 (c->cmd->flags & CMD_WRITE ||
2649 c->cmd->proc == pingCommand))
2650 {
2651 flagTransaction(c);
2652 if (deny_write_type == DISK_ERROR_TYPE_RDB)
2653 addReply(c, shared.bgsaveerr);
2654 else
2655 addReplySds(c,
2656 sdscatprintf(sdsempty(),
2657 "-MISCONF Errors writing to the AOF file: %s\r\n",
2658 strerror(server.aof_last_write_errno)));
2659 return C_OK;
2660 }
2661
2662 /* Don't accept write commands if there are not enough good slaves and
2663 * user configured the min-slaves-to-write option. */
2664 if (server.masterhost == NULL &&
2665 server.repl_min_slaves_to_write &&
2666 server.repl_min_slaves_max_lag &&
2667 c->cmd->flags & CMD_WRITE &&
2668 server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2669 {
2670 flagTransaction(c);
2671 addReply(c, shared.noreplicaserr);
2672 return C_OK;
2673 }
2674
2675 /* Don't accept write commands if this is a read only slave. But
2676 * accept write commands if this is our master. */
2677 if (server.masterhost && server.repl_slave_ro &&
2678 !(c->flags & CLIENT_MASTER) &&
2679 c->cmd->flags & CMD_WRITE)
2680 {
2681 addReply(c, shared.roslaveerr);
2682 return C_OK;
2683 }
2684
2685 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2686 if (c->flags & CLIENT_PUBSUB &&
2687 c->cmd->proc != pingCommand &&
2688 c->cmd->proc != subscribeCommand &&
2689 c->cmd->proc != unsubscribeCommand &&
2690 c->cmd->proc != psubscribeCommand &&
2691 c->cmd->proc != punsubscribeCommand) {
2692 addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2693 return C_OK;
2694 }
2695
2696 /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
2697 * when slave-serve-stale-data is no and we are a slave with a broken
2698 * link with master. */
2699 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2700 server.repl_serve_stale_data == 0 &&
2701 !(c->cmd->flags & CMD_STALE))
2702 {
2703 flagTransaction(c);
2704 addReply(c, shared.masterdownerr);
2705 return C_OK;
2706 }
2707
2708 /* Loading DB? Return an error if the command has not the
2709 * CMD_LOADING flag. */
2710 if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2711 addReply(c, shared.loadingerr);
2712 return C_OK;
2713 }
2714
2715 /* Lua script too slow? Only allow a limited number of commands. */
2716 if (server.lua_timedout &&
2717 c->cmd->proc != authCommand &&
2718 c->cmd->proc != replconfCommand &&
2719 !(c->cmd->proc == shutdownCommand &&
2720 c->argc == 2 &&
2721 tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2722 !(c->cmd->proc == scriptCommand &&
2723 c->argc == 2 &&
2724 tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2725 {
2726 flagTransaction(c);
2727 addReply(c, shared.slowscripterr);
2728 return C_OK;
2729 }
2730
2731 /* Exec the command */
2732 if (c->flags & CLIENT_MULTI &&
2733 c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2734 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2735 {
2736 queueMultiCommand(c);
2737 addReply(c,shared.queued);
2738 } else {
2739 call(c,CMD_CALL_FULL);
2740 c->woff = server.master_repl_offset;
2741 if (listLength(server.ready_keys))
2742 handleClientsBlockedOnKeys();
2743 }
2744 return C_OK;
2745 }
2746
2747 /*================================== Shutdown =============================== */
2748
2749 /* Close listening sockets. Also unlink the unix domain socket if
2750 * unlink_unix_socket is non-zero. */
closeListeningSockets(int unlink_unix_socket)2751 void closeListeningSockets(int unlink_unix_socket) {
2752 int j;
2753
2754 for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2755 if (server.sofd != -1) close(server.sofd);
2756 if (server.cluster_enabled)
2757 for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2758 if (unlink_unix_socket && server.unixsocket) {
2759 serverLog(LL_NOTICE,"Removing the unix socket file.");
2760 unlink(server.unixsocket); /* don't care if this fails */
2761 }
2762 }
2763
2764 /* Reset cpu affinity as soon as new process fork().
2765 * For new process will use same cpu core with redis server. */
resetCpuAffinity(const char * name)2766 void resetCpuAffinity(const char* name)
2767 {
2768 int j = 0, s = 0;
2769 cpu_set_t cpuset_frm, cpuset_to;
2770 pthread_t thread;
2771 #ifdef HAVE_FF_KQUEUE
2772 thread = pthread_self();
2773 CPU_ZERO(&cpuset_frm);
2774 CPU_ZERO(&cpuset_to);
2775
2776 pthread_getaffinity_np(thread, sizeof(cpu_set_t), &cpuset_frm);
2777 for (j = 0; j < CPU_SETSIZE; j++)
2778 {
2779 if ( CPU_ISSET(j, &cpuset_frm) )
2780 continue;
2781 CPU_SET(j, &cpuset_to);
2782 }
2783 s = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset_to);
2784 if (s != 0)
2785 serverLog(LL_WARNING,"set cpu affinity, failed.");
2786 if (name!=NULL)
2787 {
2788 pthread_setname_np(thread, name);
2789 }
2790 #endif
2791 return;
2792 }
2793
prepareForShutdown(int flags)2794 int prepareForShutdown(int flags) {
2795 int save = flags & SHUTDOWN_SAVE;
2796 int nosave = flags & SHUTDOWN_NOSAVE;
2797
2798 serverLog(LL_WARNING,"User requested shutdown...");
2799
2800 /* Kill all the Lua debugger forked sessions. */
2801 ldbKillForkedSessions();
2802
2803 /* Kill the saving child if there is a background saving in progress.
2804 We want to avoid race conditions, for instance our saving child may
2805 overwrite the synchronous saving did by SHUTDOWN. */
2806 if (server.rdb_child_pid != -1) {
2807 serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2808 kill(server.rdb_child_pid,SIGUSR1);
2809 rdbRemoveTempFile(server.rdb_child_pid);
2810 }
2811
2812 if (server.aof_state != AOF_OFF) {
2813 /* Kill the AOF saving child as the AOF we already have may be longer
2814 * but contains the full dataset anyway. */
2815 if (server.aof_child_pid != -1) {
2816 /* If we have AOF enabled but haven't written the AOF yet, don't
2817 * shutdown or else the dataset will be lost. */
2818 if (server.aof_state == AOF_WAIT_REWRITE) {
2819 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2820 return C_ERR;
2821 }
2822 serverLog(LL_WARNING,
2823 "There is a child rewriting the AOF. Killing it!");
2824 kill(server.aof_child_pid,SIGUSR1);
2825 }
2826 /* Append only file: flush buffers and fsync() the AOF at exit */
2827 serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2828 flushAppendOnlyFile(1);
2829 redis_fsync(server.aof_fd);
2830 }
2831
2832 /* Create a new RDB file before exiting. */
2833 if ((server.saveparamslen > 0 && !nosave) || save) {
2834 serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2835 /* Snapshotting. Perform a SYNC SAVE and exit */
2836 rdbSaveInfo rsi, *rsiptr;
2837 rsiptr = rdbPopulateSaveInfo(&rsi);
2838 if (rdbSave(server.rdb_filename,rsiptr) != C_OK) {
2839 /* Ooops.. error saving! The best we can do is to continue
2840 * operating. Note that if there was a background saving process,
2841 * in the next cron() Redis will be notified that the background
2842 * saving aborted, handling special stuff like slaves pending for
2843 * synchronization... */
2844 serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2845 return C_ERR;
2846 }
2847 }
2848
2849 /* Remove the pid file if possible and needed. */
2850 if (server.daemonize || server.pidfile) {
2851 serverLog(LL_NOTICE,"Removing the pid file.");
2852 unlink(server.pidfile);
2853 }
2854
2855 /* Best effort flush of slave output buffers, so that we hopefully
2856 * send them pending writes. */
2857 flushSlavesOutputBuffers();
2858
2859 /* Close the listening sockets. Apparently this allows faster restarts. */
2860 closeListeningSockets(1);
2861 serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2862 server.sentinel_mode ? "Sentinel" : "Redis");
2863 return C_OK;
2864 }
2865
2866 /*================================== Commands =============================== */
2867
2868 /* Sometimes Redis cannot accept write commands because there is a perstence
2869 * error with the RDB or AOF file, and Redis is configured in order to stop
2870 * accepting writes in such situation. This function returns if such a
2871 * condition is active, and the type of the condition.
2872 *
2873 * Function return values:
2874 *
2875 * DISK_ERROR_TYPE_NONE: No problems, we can accept writes.
2876 * DISK_ERROR_TYPE_AOF: Don't accept writes: AOF errors.
2877 * DISK_ERROR_TYPE_RDB: Don't accept writes: RDB errors.
2878 */
writeCommandsDeniedByDiskError(void)2879 int writeCommandsDeniedByDiskError(void) {
2880 if (server.stop_writes_on_bgsave_err &&
2881 server.saveparamslen > 0 &&
2882 server.lastbgsave_status == C_ERR)
2883 {
2884 return DISK_ERROR_TYPE_RDB;
2885 } else if (server.aof_state != AOF_OFF &&
2886 server.aof_last_write_status == C_ERR)
2887 {
2888 return DISK_ERROR_TYPE_AOF;
2889 } else {
2890 return DISK_ERROR_TYPE_NONE;
2891 }
2892 }
2893
2894 /* Return zero if strings are the same, non-zero if they are not.
2895 * The comparison is performed in a way that prevents an attacker to obtain
2896 * information about the nature of the strings just monitoring the execution
2897 * time of the function.
2898 *
2899 * Note that limiting the comparison length to strings up to 512 bytes we
2900 * can avoid leaking any information about the password length and any
2901 * possible branch misprediction related leak.
2902 */
time_independent_strcmp(char * a,char * b)2903 int time_independent_strcmp(char *a, char *b) {
2904 char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2905 /* The above two strlen perform len(a) + len(b) operations where either
2906 * a or b are fixed (our password) length, and the difference is only
2907 * relative to the length of the user provided string, so no information
2908 * leak is possible in the following two lines of code. */
2909 unsigned int alen = strlen(a);
2910 unsigned int blen = strlen(b);
2911 unsigned int j;
2912 int diff = 0;
2913
2914 /* We can't compare strings longer than our static buffers.
2915 * Note that this will never pass the first test in practical circumstances
2916 * so there is no info leak. */
2917 if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2918
2919 memset(bufa,0,sizeof(bufa)); /* Constant time. */
2920 memset(bufb,0,sizeof(bufb)); /* Constant time. */
2921 /* Again the time of the following two copies is proportional to
2922 * len(a) + len(b) so no info is leaked. */
2923 memcpy(bufa,a,alen);
2924 memcpy(bufb,b,blen);
2925
2926 /* Always compare all the chars in the two buffers without
2927 * conditional expressions. */
2928 for (j = 0; j < sizeof(bufa); j++) {
2929 diff |= (bufa[j] ^ bufb[j]);
2930 }
2931 /* Length must be equal as well. */
2932 diff |= alen ^ blen;
2933 return diff; /* If zero strings are the same. */
2934 }
2935
authCommand(client * c)2936 void authCommand(client *c) {
2937 if (!server.requirepass) {
2938 addReplyError(c,"Client sent AUTH, but no password is set");
2939 } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2940 c->authenticated = 1;
2941 addReply(c,shared.ok);
2942 } else {
2943 c->authenticated = 0;
2944 addReplyError(c,"invalid password");
2945 }
2946 }
2947
2948 /* The PING command. It works in a different way if the client is in
2949 * in Pub/Sub mode. */
pingCommand(client * c)2950 void pingCommand(client *c) {
2951 /* The command takes zero or one arguments. */
2952 if (c->argc > 2) {
2953 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2954 c->cmd->name);
2955 return;
2956 }
2957
2958 if (c->flags & CLIENT_PUBSUB) {
2959 addReply(c,shared.mbulkhdr[2]);
2960 addReplyBulkCBuffer(c,"pong",4);
2961 if (c->argc == 1)
2962 addReplyBulkCBuffer(c,"",0);
2963 else
2964 addReplyBulk(c,c->argv[1]);
2965 } else {
2966 if (c->argc == 1)
2967 addReply(c,shared.pong);
2968 else
2969 addReplyBulk(c,c->argv[1]);
2970 }
2971 }
2972
echoCommand(client * c)2973 void echoCommand(client *c) {
2974 addReplyBulk(c,c->argv[1]);
2975 }
2976
timeCommand(client * c)2977 void timeCommand(client *c) {
2978 struct timeval tv;
2979
2980 /* gettimeofday() can only fail if &tv is a bad address so we
2981 * don't check for errors. */
2982 gettimeofday(&tv,NULL);
2983 addReplyMultiBulkLen(c,2);
2984 addReplyBulkLongLong(c,tv.tv_sec);
2985 addReplyBulkLongLong(c,tv.tv_usec);
2986 }
2987
2988 /* Helper function for addReplyCommand() to output flags. */
addReplyCommandFlag(client * c,struct redisCommand * cmd,int f,char * reply)2989 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2990 if (cmd->flags & f) {
2991 addReplyStatus(c, reply);
2992 return 1;
2993 }
2994 return 0;
2995 }
2996
2997 /* Output the representation of a Redis command. Used by the COMMAND command. */
addReplyCommand(client * c,struct redisCommand * cmd)2998 void addReplyCommand(client *c, struct redisCommand *cmd) {
2999 if (!cmd) {
3000 addReply(c, shared.nullbulk);
3001 } else {
3002 /* We are adding: command name, arg count, flags, first, last, offset */
3003 addReplyMultiBulkLen(c, 6);
3004 addReplyBulkCString(c, cmd->name);
3005 addReplyLongLong(c, cmd->arity);
3006
3007 int flagcount = 0;
3008 void *flaglen = addDeferredMultiBulkLength(c);
3009 flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
3010 flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
3011 flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
3012 flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
3013 flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
3014 flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
3015 flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
3016 flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
3017 flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
3018 flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
3019 flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
3020 flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
3021 flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
3022 if ((cmd->getkeys_proc && !(cmd->flags & CMD_MODULE)) ||
3023 cmd->flags & CMD_MODULE_GETKEYS)
3024 {
3025 addReplyStatus(c, "movablekeys");
3026 flagcount += 1;
3027 }
3028 setDeferredMultiBulkLength(c, flaglen, flagcount);
3029
3030 addReplyLongLong(c, cmd->firstkey);
3031 addReplyLongLong(c, cmd->lastkey);
3032 addReplyLongLong(c, cmd->keystep);
3033 }
3034 }
3035
3036 /* COMMAND <subcommand> <args> */
commandCommand(client * c)3037 void commandCommand(client *c) {
3038 dictIterator *di;
3039 dictEntry *de;
3040
3041 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
3042 const char *help[] = {
3043 "(no subcommand) -- Return details about all Redis commands.",
3044 "COUNT -- Return the total number of commands in this Redis server.",
3045 "GETKEYS <full-command> -- Return the keys from a full Redis command.",
3046 "INFO [command-name ...] -- Return details about multiple Redis commands.",
3047 NULL
3048 };
3049 addReplyHelp(c, help);
3050 } else if (c->argc == 1) {
3051 addReplyMultiBulkLen(c, dictSize(server.commands));
3052 di = dictGetIterator(server.commands);
3053 while ((de = dictNext(di)) != NULL) {
3054 addReplyCommand(c, dictGetVal(de));
3055 }
3056 dictReleaseIterator(di);
3057 } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
3058 int i;
3059 addReplyMultiBulkLen(c, c->argc-2);
3060 for (i = 2; i < c->argc; i++) {
3061 addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
3062 }
3063 } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
3064 addReplyLongLong(c, dictSize(server.commands));
3065 } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
3066 struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
3067 int *keys, numkeys, j;
3068
3069 if (!cmd) {
3070 addReplyError(c,"Invalid command specified");
3071 return;
3072 } else if (cmd->getkeys_proc == NULL && cmd->firstkey == 0) {
3073 addReplyError(c,"The command has no key arguments");
3074 return;
3075 } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
3076 ((c->argc-2) < -cmd->arity))
3077 {
3078 addReplyError(c,"Invalid number of arguments specified for command");
3079 return;
3080 }
3081
3082 keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
3083 if (!keys) {
3084 addReplyError(c,"Invalid arguments specified for command");
3085 } else {
3086 addReplyMultiBulkLen(c,numkeys);
3087 for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
3088 getKeysFreeResult(keys);
3089 }
3090 } else {
3091 addReplySubcommandSyntaxError(c);
3092 }
3093 }
3094
3095 /* Convert an amount of bytes into a human readable string in the form
3096 * of 100B, 2G, 100M, 4K, and so forth. */
bytesToHuman(char * s,unsigned long long n)3097 void bytesToHuman(char *s, unsigned long long n) {
3098 double d;
3099
3100 if (n < 1024) {
3101 /* Bytes */
3102 sprintf(s,"%lluB",n);
3103 } else if (n < (1024*1024)) {
3104 d = (double)n/(1024);
3105 sprintf(s,"%.2fK",d);
3106 } else if (n < (1024LL*1024*1024)) {
3107 d = (double)n/(1024*1024);
3108 sprintf(s,"%.2fM",d);
3109 } else if (n < (1024LL*1024*1024*1024)) {
3110 d = (double)n/(1024LL*1024*1024);
3111 sprintf(s,"%.2fG",d);
3112 } else if (n < (1024LL*1024*1024*1024*1024)) {
3113 d = (double)n/(1024LL*1024*1024*1024);
3114 sprintf(s,"%.2fT",d);
3115 } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
3116 d = (double)n/(1024LL*1024*1024*1024*1024);
3117 sprintf(s,"%.2fP",d);
3118 } else {
3119 /* Let's hope we never need this */
3120 sprintf(s,"%lluB",n);
3121 }
3122 }
3123
3124 /* Create the string returned by the INFO command. This is decoupled
3125 * by the INFO command itself as we need to report the same information
3126 * on memory corruption problems. */
genRedisInfoString(char * section)3127 sds genRedisInfoString(char *section) {
3128 sds info = sdsempty();
3129 time_t uptime = server.unixtime-server.stat_starttime;
3130 int j;
3131 struct rusage self_ru, c_ru;
3132 int allsections = 0, defsections = 0;
3133 int sections = 0;
3134
3135 if (section == NULL) section = "default";
3136 allsections = strcasecmp(section,"all") == 0;
3137 defsections = strcasecmp(section,"default") == 0;
3138
3139 getrusage(RUSAGE_SELF, &self_ru);
3140 getrusage(RUSAGE_CHILDREN, &c_ru);
3141
3142 /* Server */
3143 if (allsections || defsections || !strcasecmp(section,"server")) {
3144 static int call_uname = 1;
3145 static struct utsname name;
3146 char *mode;
3147
3148 if (server.cluster_enabled) mode = "cluster";
3149 else if (server.sentinel_mode) mode = "sentinel";
3150 else mode = "standalone";
3151
3152 if (sections++) info = sdscat(info,"\r\n");
3153
3154 if (call_uname) {
3155 /* Uname can be slow and is always the same output. Cache it. */
3156 uname(&name);
3157 call_uname = 0;
3158 }
3159
3160 unsigned int lruclock;
3161 atomicGet(server.lruclock,lruclock);
3162 info = sdscatprintf(info,
3163 "# Server\r\n"
3164 "redis_version:%s\r\n"
3165 "redis_git_sha1:%s\r\n"
3166 "redis_git_dirty:%d\r\n"
3167 "redis_build_id:%llx\r\n"
3168 "redis_mode:%s\r\n"
3169 "os:%s %s %s\r\n"
3170 "arch_bits:%d\r\n"
3171 "multiplexing_api:%s\r\n"
3172 "atomicvar_api:%s\r\n"
3173 "gcc_version:%d.%d.%d\r\n"
3174 "process_id:%ld\r\n"
3175 "run_id:%s\r\n"
3176 "tcp_port:%d\r\n"
3177 "uptime_in_seconds:%jd\r\n"
3178 "uptime_in_days:%jd\r\n"
3179 "hz:%d\r\n"
3180 "configured_hz:%d\r\n"
3181 "lru_clock:%ld\r\n"
3182 "executable:%s\r\n"
3183 "config_file:%s\r\n",
3184 REDIS_VERSION,
3185 redisGitSHA1(),
3186 strtol(redisGitDirty(),NULL,10) > 0,
3187 (unsigned long long) redisBuildId(),
3188 mode,
3189 name.sysname, name.release, name.machine,
3190 server.arch_bits,
3191 aeGetApiName(),
3192 REDIS_ATOMIC_API,
3193 #ifdef __GNUC__
3194 __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
3195 #else
3196 0,0,0,
3197 #endif
3198 (long) getpid(),
3199 server.runid,
3200 server.port,
3201 (intmax_t)uptime,
3202 (intmax_t)(uptime/(3600*24)),
3203 server.hz,
3204 server.config_hz,
3205 (unsigned long) lruclock,
3206 server.executable ? server.executable : "",
3207 server.configfile ? server.configfile : "");
3208 }
3209
3210 /* Clients */
3211 if (allsections || defsections || !strcasecmp(section,"clients")) {
3212 size_t maxin, maxout;
3213 getExpansiveClientsInfo(&maxin,&maxout);
3214 if (sections++) info = sdscat(info,"\r\n");
3215 info = sdscatprintf(info,
3216 "# Clients\r\n"
3217 "connected_clients:%lu\r\n"
3218 "client_recent_max_input_buffer:%zu\r\n"
3219 "client_recent_max_output_buffer:%zu\r\n"
3220 "blocked_clients:%d\r\n",
3221 listLength(server.clients)-listLength(server.slaves),
3222 maxin, maxout,
3223 server.blocked_clients);
3224 }
3225
3226 /* Memory */
3227 if (allsections || defsections || !strcasecmp(section,"memory")) {
3228 char hmem[64];
3229 char peak_hmem[64];
3230 char total_system_hmem[64];
3231 char used_memory_lua_hmem[64];
3232 char used_memory_scripts_hmem[64];
3233 char used_memory_rss_hmem[64];
3234 char maxmemory_hmem[64];
3235 size_t zmalloc_used = zmalloc_used_memory();
3236 size_t total_system_mem = server.system_memory_size;
3237 const char *evict_policy = evictPolicyToString();
3238 long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
3239 struct redisMemOverhead *mh = getMemoryOverheadData();
3240
3241 /* Peak memory is updated from time to time by serverCron() so it
3242 * may happen that the instantaneous value is slightly bigger than
3243 * the peak value. This may confuse users, so we update the peak
3244 * if found smaller than the current memory usage. */
3245 if (zmalloc_used > server.stat_peak_memory)
3246 server.stat_peak_memory = zmalloc_used;
3247
3248 bytesToHuman(hmem,zmalloc_used);
3249 bytesToHuman(peak_hmem,server.stat_peak_memory);
3250 bytesToHuman(total_system_hmem,total_system_mem);
3251 bytesToHuman(used_memory_lua_hmem,memory_lua);
3252 bytesToHuman(used_memory_scripts_hmem,mh->lua_caches);
3253 bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss);
3254 bytesToHuman(maxmemory_hmem,server.maxmemory);
3255
3256 if (sections++) info = sdscat(info,"\r\n");
3257 info = sdscatprintf(info,
3258 "# Memory\r\n"
3259 "used_memory:%zu\r\n"
3260 "used_memory_human:%s\r\n"
3261 "used_memory_rss:%zu\r\n"
3262 "used_memory_rss_human:%s\r\n"
3263 "used_memory_peak:%zu\r\n"
3264 "used_memory_peak_human:%s\r\n"
3265 "used_memory_peak_perc:%.2f%%\r\n"
3266 "used_memory_overhead:%zu\r\n"
3267 "used_memory_startup:%zu\r\n"
3268 "used_memory_dataset:%zu\r\n"
3269 "used_memory_dataset_perc:%.2f%%\r\n"
3270 "allocator_allocated:%zu\r\n"
3271 "allocator_active:%zu\r\n"
3272 "allocator_resident:%zu\r\n"
3273 "total_system_memory:%lu\r\n"
3274 "total_system_memory_human:%s\r\n"
3275 "used_memory_lua:%lld\r\n"
3276 "used_memory_lua_human:%s\r\n"
3277 "used_memory_scripts:%lld\r\n"
3278 "used_memory_scripts_human:%s\r\n"
3279 "number_of_cached_scripts:%lu\r\n"
3280 "maxmemory:%lld\r\n"
3281 "maxmemory_human:%s\r\n"
3282 "maxmemory_policy:%s\r\n"
3283 "allocator_frag_ratio:%.2f\r\n"
3284 "allocator_frag_bytes:%zu\r\n"
3285 "allocator_rss_ratio:%.2f\r\n"
3286 "allocator_rss_bytes:%zd\r\n"
3287 "rss_overhead_ratio:%.2f\r\n"
3288 "rss_overhead_bytes:%zd\r\n"
3289 "mem_fragmentation_ratio:%.2f\r\n"
3290 "mem_fragmentation_bytes:%zd\r\n"
3291 "mem_not_counted_for_evict:%zu\r\n"
3292 "mem_replication_backlog:%zu\r\n"
3293 "mem_clients_slaves:%zu\r\n"
3294 "mem_clients_normal:%zu\r\n"
3295 "mem_aof_buffer:%zu\r\n"
3296 "mem_allocator:%s\r\n"
3297 "active_defrag_running:%d\r\n"
3298 "lazyfree_pending_objects:%zu\r\n",
3299 zmalloc_used,
3300 hmem,
3301 server.cron_malloc_stats.process_rss,
3302 used_memory_rss_hmem,
3303 server.stat_peak_memory,
3304 peak_hmem,
3305 mh->peak_perc,
3306 mh->overhead_total,
3307 mh->startup_allocated,
3308 mh->dataset,
3309 mh->dataset_perc,
3310 server.cron_malloc_stats.allocator_allocated,
3311 server.cron_malloc_stats.allocator_active,
3312 server.cron_malloc_stats.allocator_resident,
3313 (unsigned long)total_system_mem,
3314 total_system_hmem,
3315 memory_lua,
3316 used_memory_lua_hmem,
3317 (long long) mh->lua_caches,
3318 used_memory_scripts_hmem,
3319 dictSize(server.lua_scripts),
3320 server.maxmemory,
3321 maxmemory_hmem,
3322 evict_policy,
3323 mh->allocator_frag,
3324 mh->allocator_frag_bytes,
3325 mh->allocator_rss,
3326 mh->allocator_rss_bytes,
3327 mh->rss_extra,
3328 mh->rss_extra_bytes,
3329 mh->total_frag, /* this is the total RSS overhead, including fragmentation, */
3330 mh->total_frag_bytes, /* named so for backwards compatibility */
3331 freeMemoryGetNotCountedMemory(),
3332 mh->repl_backlog,
3333 mh->clients_slaves,
3334 mh->clients_normal,
3335 mh->aof_buffer,
3336 ZMALLOC_LIB,
3337 server.active_defrag_running,
3338 lazyfreeGetPendingObjectsCount()
3339 );
3340 freeMemoryOverheadData(mh);
3341 }
3342
3343 /* Persistence */
3344 if (allsections || defsections || !strcasecmp(section,"persistence")) {
3345 if (sections++) info = sdscat(info,"\r\n");
3346 info = sdscatprintf(info,
3347 "# Persistence\r\n"
3348 "loading:%d\r\n"
3349 "rdb_changes_since_last_save:%lld\r\n"
3350 "rdb_bgsave_in_progress:%d\r\n"
3351 "rdb_last_save_time:%jd\r\n"
3352 "rdb_last_bgsave_status:%s\r\n"
3353 "rdb_last_bgsave_time_sec:%jd\r\n"
3354 "rdb_current_bgsave_time_sec:%jd\r\n"
3355 "rdb_last_cow_size:%zu\r\n"
3356 "aof_enabled:%d\r\n"
3357 "aof_rewrite_in_progress:%d\r\n"
3358 "aof_rewrite_scheduled:%d\r\n"
3359 "aof_last_rewrite_time_sec:%jd\r\n"
3360 "aof_current_rewrite_time_sec:%jd\r\n"
3361 "aof_last_bgrewrite_status:%s\r\n"
3362 "aof_last_write_status:%s\r\n"
3363 "aof_last_cow_size:%zu\r\n",
3364 server.loading,
3365 server.dirty,
3366 server.rdb_child_pid != -1,
3367 (intmax_t)server.lastsave,
3368 (server.lastbgsave_status == C_OK) ? "ok" : "err",
3369 (intmax_t)server.rdb_save_time_last,
3370 (intmax_t)((server.rdb_child_pid == -1) ?
3371 -1 : time(NULL)-server.rdb_save_time_start),
3372 server.stat_rdb_cow_bytes,
3373 server.aof_state != AOF_OFF,
3374 server.aof_child_pid != -1,
3375 server.aof_rewrite_scheduled,
3376 (intmax_t)server.aof_rewrite_time_last,
3377 (intmax_t)((server.aof_child_pid == -1) ?
3378 -1 : time(NULL)-server.aof_rewrite_time_start),
3379 (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
3380 (server.aof_last_write_status == C_OK) ? "ok" : "err",
3381 server.stat_aof_cow_bytes);
3382
3383 if (server.aof_state != AOF_OFF) {
3384 info = sdscatprintf(info,
3385 "aof_current_size:%lld\r\n"
3386 "aof_base_size:%lld\r\n"
3387 "aof_pending_rewrite:%d\r\n"
3388 "aof_buffer_length:%zu\r\n"
3389 "aof_rewrite_buffer_length:%lu\r\n"
3390 "aof_pending_bio_fsync:%llu\r\n"
3391 "aof_delayed_fsync:%lu\r\n",
3392 (long long) server.aof_current_size,
3393 (long long) server.aof_rewrite_base_size,
3394 server.aof_rewrite_scheduled,
3395 sdslen(server.aof_buf),
3396 aofRewriteBufferSize(),
3397 bioPendingJobsOfType(BIO_AOF_FSYNC),
3398 server.aof_delayed_fsync);
3399 }
3400
3401 if (server.loading) {
3402 double perc;
3403 time_t eta, elapsed;
3404 off_t remaining_bytes = server.loading_total_bytes-
3405 server.loading_loaded_bytes;
3406
3407 perc = ((double)server.loading_loaded_bytes /
3408 (server.loading_total_bytes+1)) * 100;
3409
3410 elapsed = time(NULL)-server.loading_start_time;
3411 if (elapsed == 0) {
3412 eta = 1; /* A fake 1 second figure if we don't have
3413 enough info */
3414 } else {
3415 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3416 }
3417
3418 info = sdscatprintf(info,
3419 "loading_start_time:%jd\r\n"
3420 "loading_total_bytes:%llu\r\n"
3421 "loading_loaded_bytes:%llu\r\n"
3422 "loading_loaded_perc:%.2f\r\n"
3423 "loading_eta_seconds:%jd\r\n",
3424 (intmax_t) server.loading_start_time,
3425 (unsigned long long) server.loading_total_bytes,
3426 (unsigned long long) server.loading_loaded_bytes,
3427 perc,
3428 (intmax_t)eta
3429 );
3430 }
3431 }
3432
3433 /* Stats */
3434 if (allsections || defsections || !strcasecmp(section,"stats")) {
3435 if (sections++) info = sdscat(info,"\r\n");
3436 info = sdscatprintf(info,
3437 "# Stats\r\n"
3438 "total_connections_received:%lld\r\n"
3439 "total_commands_processed:%lld\r\n"
3440 "instantaneous_ops_per_sec:%lld\r\n"
3441 "total_net_input_bytes:%lld\r\n"
3442 "total_net_output_bytes:%lld\r\n"
3443 "instantaneous_input_kbps:%.2f\r\n"
3444 "instantaneous_output_kbps:%.2f\r\n"
3445 "rejected_connections:%lld\r\n"
3446 "sync_full:%lld\r\n"
3447 "sync_partial_ok:%lld\r\n"
3448 "sync_partial_err:%lld\r\n"
3449 "expired_keys:%lld\r\n"
3450 "expired_stale_perc:%.2f\r\n"
3451 "expired_time_cap_reached_count:%lld\r\n"
3452 "evicted_keys:%lld\r\n"
3453 "keyspace_hits:%lld\r\n"
3454 "keyspace_misses:%lld\r\n"
3455 "pubsub_channels:%ld\r\n"
3456 "pubsub_patterns:%lu\r\n"
3457 "latest_fork_usec:%lld\r\n"
3458 "migrate_cached_sockets:%ld\r\n"
3459 "slave_expires_tracked_keys:%zu\r\n"
3460 "active_defrag_hits:%lld\r\n"
3461 "active_defrag_misses:%lld\r\n"
3462 "active_defrag_key_hits:%lld\r\n"
3463 "active_defrag_key_misses:%lld\r\n",
3464 server.stat_numconnections,
3465 server.stat_numcommands,
3466 getInstantaneousMetric(STATS_METRIC_COMMAND),
3467 server.stat_net_input_bytes,
3468 server.stat_net_output_bytes,
3469 (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3470 (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3471 server.stat_rejected_conn,
3472 server.stat_sync_full,
3473 server.stat_sync_partial_ok,
3474 server.stat_sync_partial_err,
3475 server.stat_expiredkeys,
3476 server.stat_expired_stale_perc*100,
3477 server.stat_expired_time_cap_reached_count,
3478 server.stat_evictedkeys,
3479 server.stat_keyspace_hits,
3480 server.stat_keyspace_misses,
3481 dictSize(server.pubsub_channels),
3482 listLength(server.pubsub_patterns),
3483 server.stat_fork_time,
3484 dictSize(server.migrate_cached_sockets),
3485 getSlaveKeyWithExpireCount(),
3486 server.stat_active_defrag_hits,
3487 server.stat_active_defrag_misses,
3488 server.stat_active_defrag_key_hits,
3489 server.stat_active_defrag_key_misses);
3490 }
3491
3492 /* Replication */
3493 if (allsections || defsections || !strcasecmp(section,"replication")) {
3494 if (sections++) info = sdscat(info,"\r\n");
3495 info = sdscatprintf(info,
3496 "# Replication\r\n"
3497 "role:%s\r\n",
3498 server.masterhost == NULL ? "master" : "slave");
3499 if (server.masterhost) {
3500 long long slave_repl_offset = 1;
3501
3502 if (server.master)
3503 slave_repl_offset = server.master->reploff;
3504 else if (server.cached_master)
3505 slave_repl_offset = server.cached_master->reploff;
3506
3507 info = sdscatprintf(info,
3508 "master_host:%s\r\n"
3509 "master_port:%d\r\n"
3510 "master_link_status:%s\r\n"
3511 "master_last_io_seconds_ago:%d\r\n"
3512 "master_sync_in_progress:%d\r\n"
3513 "slave_repl_offset:%lld\r\n"
3514 ,server.masterhost,
3515 server.masterport,
3516 (server.repl_state == REPL_STATE_CONNECTED) ?
3517 "up" : "down",
3518 server.master ?
3519 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3520 server.repl_state == REPL_STATE_TRANSFER,
3521 slave_repl_offset
3522 );
3523
3524 if (server.repl_state == REPL_STATE_TRANSFER) {
3525 info = sdscatprintf(info,
3526 "master_sync_left_bytes:%lld\r\n"
3527 "master_sync_last_io_seconds_ago:%d\r\n"
3528 , (long long)
3529 (server.repl_transfer_size - server.repl_transfer_read),
3530 (int)(server.unixtime-server.repl_transfer_lastio)
3531 );
3532 }
3533
3534 if (server.repl_state != REPL_STATE_CONNECTED) {
3535 info = sdscatprintf(info,
3536 "master_link_down_since_seconds:%jd\r\n",
3537 (intmax_t)server.unixtime-server.repl_down_since);
3538 }
3539 info = sdscatprintf(info,
3540 "slave_priority:%d\r\n"
3541 "slave_read_only:%d\r\n",
3542 server.slave_priority,
3543 server.repl_slave_ro);
3544 }
3545
3546 info = sdscatprintf(info,
3547 "connected_slaves:%lu\r\n",
3548 listLength(server.slaves));
3549
3550 /* If min-slaves-to-write is active, write the number of slaves
3551 * currently considered 'good'. */
3552 if (server.repl_min_slaves_to_write &&
3553 server.repl_min_slaves_max_lag) {
3554 info = sdscatprintf(info,
3555 "min_slaves_good_slaves:%d\r\n",
3556 server.repl_good_slaves_count);
3557 }
3558
3559 if (listLength(server.slaves)) {
3560 int slaveid = 0;
3561 listNode *ln;
3562 listIter li;
3563
3564 listRewind(server.slaves,&li);
3565 while((ln = listNext(&li))) {
3566 client *slave = listNodeValue(ln);
3567 char *state = NULL;
3568 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
3569 int port;
3570 long lag = 0;
3571
3572 if (slaveip[0] == '\0') {
3573 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1)
3574 continue;
3575 slaveip = ip;
3576 }
3577 switch(slave->replstate) {
3578 case SLAVE_STATE_WAIT_BGSAVE_START:
3579 case SLAVE_STATE_WAIT_BGSAVE_END:
3580 state = "wait_bgsave";
3581 break;
3582 case SLAVE_STATE_SEND_BULK:
3583 state = "send_bulk";
3584 break;
3585 case SLAVE_STATE_ONLINE:
3586 state = "online";
3587 break;
3588 }
3589 if (state == NULL) continue;
3590 if (slave->replstate == SLAVE_STATE_ONLINE)
3591 lag = time(NULL) - slave->repl_ack_time;
3592
3593 info = sdscatprintf(info,
3594 "slave%d:ip=%s,port=%d,state=%s,"
3595 "offset=%lld,lag=%ld\r\n",
3596 slaveid,slaveip,slave->slave_listening_port,state,
3597 slave->repl_ack_off, lag);
3598 slaveid++;
3599 }
3600 }
3601 info = sdscatprintf(info,
3602 "master_replid:%s\r\n"
3603 "master_replid2:%s\r\n"
3604 "master_repl_offset:%lld\r\n"
3605 "second_repl_offset:%lld\r\n"
3606 "repl_backlog_active:%d\r\n"
3607 "repl_backlog_size:%lld\r\n"
3608 "repl_backlog_first_byte_offset:%lld\r\n"
3609 "repl_backlog_histlen:%lld\r\n",
3610 server.replid,
3611 server.replid2,
3612 server.master_repl_offset,
3613 server.second_replid_offset,
3614 server.repl_backlog != NULL,
3615 server.repl_backlog_size,
3616 server.repl_backlog_off,
3617 server.repl_backlog_histlen);
3618 }
3619
3620 /* CPU */
3621 if (allsections || defsections || !strcasecmp(section,"cpu")) {
3622 if (sections++) info = sdscat(info,"\r\n");
3623 info = sdscatprintf(info,
3624 "# CPU\r\n"
3625 "used_cpu_sys:%ld.%06ld\r\n"
3626 "used_cpu_user:%ld.%06ld\r\n"
3627 "used_cpu_sys_children:%ld.%06ld\r\n"
3628 "used_cpu_user_children:%ld.%06ld\r\n",
3629 (long)self_ru.ru_stime.tv_sec, (long)self_ru.ru_stime.tv_usec,
3630 (long)self_ru.ru_utime.tv_sec, (long)self_ru.ru_utime.tv_usec,
3631 (long)c_ru.ru_stime.tv_sec, (long)c_ru.ru_stime.tv_usec,
3632 (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec);
3633 }
3634
3635 /* Command statistics */
3636 if (allsections || !strcasecmp(section,"commandstats")) {
3637 if (sections++) info = sdscat(info,"\r\n");
3638 info = sdscatprintf(info, "# Commandstats\r\n");
3639
3640 struct redisCommand *c;
3641 dictEntry *de;
3642 dictIterator *di;
3643 di = dictGetSafeIterator(server.commands);
3644 while((de = dictNext(di)) != NULL) {
3645 c = (struct redisCommand *) dictGetVal(de);
3646 if (!c->calls) continue;
3647 info = sdscatprintf(info,
3648 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3649 c->name, c->calls, c->microseconds,
3650 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3651 }
3652 dictReleaseIterator(di);
3653 }
3654
3655 /* Cluster */
3656 if (allsections || defsections || !strcasecmp(section,"cluster")) {
3657 if (sections++) info = sdscat(info,"\r\n");
3658 info = sdscatprintf(info,
3659 "# Cluster\r\n"
3660 "cluster_enabled:%d\r\n",
3661 server.cluster_enabled);
3662 }
3663
3664 /* Key space */
3665 if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3666 if (sections++) info = sdscat(info,"\r\n");
3667 info = sdscatprintf(info, "# Keyspace\r\n");
3668 for (j = 0; j < server.dbnum; j++) {
3669 long long keys, vkeys;
3670
3671 keys = dictSize(server.db[j].dict);
3672 vkeys = dictSize(server.db[j].expires);
3673 if (keys || vkeys) {
3674 info = sdscatprintf(info,
3675 "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3676 j, keys, vkeys, server.db[j].avg_ttl);
3677 }
3678 }
3679 }
3680 return info;
3681 }
3682
infoCommand(client * c)3683 void infoCommand(client *c) {
3684 char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3685
3686 if (c->argc > 2) {
3687 addReply(c,shared.syntaxerr);
3688 return;
3689 }
3690 addReplyBulkSds(c, genRedisInfoString(section));
3691 }
3692
monitorCommand(client * c)3693 void monitorCommand(client *c) {
3694 /* ignore MONITOR if already slave or in monitor mode */
3695 if (c->flags & CLIENT_SLAVE) return;
3696
3697 c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3698 listAddNodeTail(server.monitors,c);
3699 addReply(c,shared.ok);
3700 }
3701
3702 /* =================================== Main! ================================ */
3703
3704 #ifdef __linux__
linuxOvercommitMemoryValue(void)3705 int linuxOvercommitMemoryValue(void) {
3706 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3707 char buf[64];
3708
3709 if (!fp) return -1;
3710 if (fgets(buf,64,fp) == NULL) {
3711 fclose(fp);
3712 return -1;
3713 }
3714 fclose(fp);
3715
3716 return atoi(buf);
3717 }
3718
linuxMemoryWarnings(void)3719 void linuxMemoryWarnings(void) {
3720 if (linuxOvercommitMemoryValue() == 0) {
3721 serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3722 }
3723 if (THPIsEnabled()) {
3724 serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3725 }
3726 }
3727 #endif /* __linux__ */
3728
createPidFile(void)3729 void createPidFile(void) {
3730 /* If pidfile requested, but no pidfile defined, use
3731 * default pidfile path */
3732 if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3733
3734 /* Try to write the pid file in a best-effort way. */
3735 FILE *fp = fopen(server.pidfile,"w");
3736 if (fp) {
3737 fprintf(fp,"%d\n",(int)getpid());
3738 fclose(fp);
3739 }
3740 }
3741
daemonize(void)3742 void daemonize(void) {
3743 int fd;
3744
3745 if (fork() != 0) exit(0); /* parent exits */
3746 setsid(); /* create a new session */
3747
3748 /* Every output goes to /dev/null. If Redis is daemonized but
3749 * the 'logfile' is set to 'stdout' in the configuration file
3750 * it will not log at all. */
3751 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3752 dup2(fd, STDIN_FILENO);
3753 dup2(fd, STDOUT_FILENO);
3754 dup2(fd, STDERR_FILENO);
3755 if (fd > STDERR_FILENO) close(fd);
3756 }
3757 }
3758
version(void)3759 void version(void) {
3760 printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3761 REDIS_VERSION,
3762 redisGitSHA1(),
3763 atoi(redisGitDirty()) > 0,
3764 ZMALLOC_LIB,
3765 sizeof(long) == 4 ? 32 : 64,
3766 (unsigned long long) redisBuildId());
3767 exit(0);
3768 }
3769
usage(void)3770 void usage(void) {
3771 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3772 fprintf(stderr," ./redis-server - (read config from stdin)\n");
3773 fprintf(stderr," ./redis-server -v or --version\n");
3774 fprintf(stderr," ./redis-server -h or --help\n");
3775 fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n");
3776 fprintf(stderr,"Examples:\n");
3777 fprintf(stderr," ./redis-server (run the server with default conf)\n");
3778 fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
3779 fprintf(stderr," ./redis-server --port 7777\n");
3780 fprintf(stderr," ./redis-server --port 7777 --replicaof 127.0.0.1 8888\n");
3781 fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3782 fprintf(stderr,"Sentinel mode:\n");
3783 fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n");
3784 exit(1);
3785 }
3786
redisAsciiArt(void)3787 void redisAsciiArt(void) {
3788 #include "asciilogo.h"
3789 char *buf = zmalloc(1024*16);
3790 char *mode;
3791
3792 if (server.cluster_enabled) mode = "cluster";
3793 else if (server.sentinel_mode) mode = "sentinel";
3794 else mode = "standalone";
3795
3796 /* Show the ASCII logo if: log file is stdout AND stdout is a
3797 * tty AND syslog logging is disabled. Also show logo if the user
3798 * forced us to do so via redis.conf. */
3799 int show_logo = ((!server.syslog_enabled &&
3800 server.logfile[0] == '\0' &&
3801 isatty(fileno(stdout))) ||
3802 server.always_show_logo);
3803
3804 if (!show_logo) {
3805 serverLog(LL_NOTICE,
3806 "Running mode=%s, port=%d.",
3807 mode, server.port
3808 );
3809 } else {
3810 snprintf(buf,1024*16,ascii_logo,
3811 REDIS_VERSION,
3812 redisGitSHA1(),
3813 strtol(redisGitDirty(),NULL,10) > 0,
3814 (sizeof(long) == 8) ? "64" : "32",
3815 mode, server.port,
3816 (long) getpid()
3817 );
3818 serverLogRaw(LL_NOTICE|LL_RAW,buf);
3819 }
3820 zfree(buf);
3821 }
3822
sigShutdownHandler(int sig)3823 static void sigShutdownHandler(int sig) {
3824 char *msg;
3825
3826 switch (sig) {
3827 case SIGINT:
3828 msg = "Received SIGINT scheduling shutdown...";
3829 break;
3830 case SIGTERM:
3831 msg = "Received SIGTERM scheduling shutdown...";
3832 break;
3833 default:
3834 msg = "Received shutdown signal, scheduling shutdown...";
3835 };
3836
3837 /* SIGINT is often delivered via Ctrl+C in an interactive session.
3838 * If we receive the signal the second time, we interpret this as
3839 * the user really wanting to quit ASAP without waiting to persist
3840 * on disk. */
3841 if (server.shutdown_asap && sig == SIGINT) {
3842 serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3843 rdbRemoveTempFile(getpid());
3844 exit(1); /* Exit with an error since this was not a clean shutdown. */
3845 } else if (server.loading) {
3846 serverLogFromHandler(LL_WARNING, "Received shutdown signal during loading, exiting now.");
3847 exit(0);
3848 }
3849
3850 serverLogFromHandler(LL_WARNING, msg);
3851 server.shutdown_asap = 1;
3852 }
3853
setupSignalHandlers(void)3854 void setupSignalHandlers(void) {
3855 struct sigaction act;
3856
3857 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3858 * Otherwise, sa_handler is used. */
3859 sigemptyset(&act.sa_mask);
3860 act.sa_flags = 0;
3861 act.sa_handler = sigShutdownHandler;
3862 sigaction(SIGTERM, &act, NULL);
3863 sigaction(SIGINT, &act, NULL);
3864
3865 #ifdef HAVE_BACKTRACE
3866 sigemptyset(&act.sa_mask);
3867 act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3868 act.sa_sigaction = sigsegvHandler;
3869 sigaction(SIGSEGV, &act, NULL);
3870 sigaction(SIGBUS, &act, NULL);
3871 sigaction(SIGFPE, &act, NULL);
3872 sigaction(SIGILL, &act, NULL);
3873 #endif
3874 return;
3875 }
3876
3877 void memtest(size_t megabytes, int passes);
3878
3879 /* Returns 1 if there is --sentinel among the arguments or if
3880 * argv[0] contains "redis-sentinel". */
checkForSentinelMode(int argc,char ** argv)3881 int checkForSentinelMode(int argc, char **argv) {
3882 int j;
3883
3884 if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3885 for (j = 1; j < argc; j++)
3886 if (!strcmp(argv[j],"--sentinel")) return 1;
3887 return 0;
3888 }
3889
3890 /* Function called at startup to load RDB or AOF file in memory. */
loadDataFromDisk(void)3891 void loadDataFromDisk(void) {
3892 long long start = ustime();
3893 if (server.aof_state == AOF_ON) {
3894 if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3895 serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3896 } else {
3897 rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
3898 if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
3899 serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3900 (float)(ustime()-start)/1000000);
3901
3902 /* Restore the replication ID / offset from the RDB file. */
3903 if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&&
3904 rsi.repl_id_is_set &&
3905 rsi.repl_offset != -1 &&
3906 /* Note that older implementations may save a repl_stream_db
3907 * of -1 inside the RDB file in a wrong way, see more information
3908 * in function rdbPopulateSaveInfo. */
3909 rsi.repl_stream_db != -1)
3910 {
3911 memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
3912 server.master_repl_offset = rsi.repl_offset;
3913 /* If we are a slave, create a cached master from this
3914 * information, in order to allow partial resynchronizations
3915 * with masters. */
3916 replicationCacheMasterUsingMyself();
3917 selectDb(server.cached_master,rsi.repl_stream_db);
3918 }
3919 } else if (errno != ENOENT) {
3920 serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3921 exit(1);
3922 }
3923 }
3924 }
3925
redisOutOfMemoryHandler(size_t allocation_size)3926 void redisOutOfMemoryHandler(size_t allocation_size) {
3927 serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3928 allocation_size);
3929 serverPanic("Redis aborting for OUT OF MEMORY");
3930 }
3931
redisSetProcTitle(char * title)3932 void redisSetProcTitle(char *title) {
3933 #ifdef USE_SETPROCTITLE
3934 char *server_mode = "";
3935 if (server.cluster_enabled) server_mode = " [cluster]";
3936 else if (server.sentinel_mode) server_mode = " [sentinel]";
3937
3938 setproctitle("%s %s:%d%s",
3939 title,
3940 server.bindaddr_count ? server.bindaddr[0] : "*",
3941 server.port,
3942 server_mode);
3943 #else
3944 UNUSED(title);
3945 #endif
3946 }
3947
3948 /*
3949 * Check whether systemd or upstart have been used to start redis.
3950 */
3951
redisSupervisedUpstart(void)3952 int redisSupervisedUpstart(void) {
3953 const char *upstart_job = getenv("UPSTART_JOB");
3954
3955 if (!upstart_job) {
3956 serverLog(LL_WARNING,
3957 "upstart supervision requested, but UPSTART_JOB not found");
3958 return 0;
3959 }
3960
3961 serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness");
3962 raise(SIGSTOP);
3963 unsetenv("UPSTART_JOB");
3964 return 1;
3965 }
3966
redisSupervisedSystemd(void)3967 int redisSupervisedSystemd(void) {
3968 const char *notify_socket = getenv("NOTIFY_SOCKET");
3969 int fd = 1;
3970 struct sockaddr_un su;
3971 struct iovec iov;
3972 struct msghdr hdr;
3973 int sendto_flags = 0;
3974
3975 if (!notify_socket) {
3976 serverLog(LL_WARNING,
3977 "systemd supervision requested, but NOTIFY_SOCKET not found");
3978 return 0;
3979 }
3980
3981 if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3982 return 0;
3983 }
3984
3985 serverLog(LL_NOTICE, "supervised by systemd, will signal readiness");
3986 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3987 serverLog(LL_WARNING,
3988 "Can't connect to systemd socket %s", notify_socket);
3989 return 0;
3990 }
3991
3992 memset(&su, 0, sizeof(su));
3993 su.sun_family = AF_UNIX;
3994 strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3995 su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3996
3997 if (notify_socket[0] == '@')
3998 su.sun_path[0] = '\0';
3999
4000 memset(&iov, 0, sizeof(iov));
4001 iov.iov_base = "READY=1";
4002 iov.iov_len = strlen("READY=1");
4003
4004 memset(&hdr, 0, sizeof(hdr));
4005 hdr.msg_name = &su;
4006 hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
4007 strlen(notify_socket);
4008 hdr.msg_iov = &iov;
4009 hdr.msg_iovlen = 1;
4010
4011 unsetenv("NOTIFY_SOCKET");
4012 #ifdef HAVE_MSG_NOSIGNAL
4013 sendto_flags |= MSG_NOSIGNAL;
4014 #endif
4015 if (sendmsg(fd, &hdr, sendto_flags) < 0) {
4016 serverLog(LL_WARNING, "Can't send notification to systemd");
4017 close(fd);
4018 return 0;
4019 }
4020 close(fd);
4021 return 1;
4022 }
4023
redisIsSupervised(int mode)4024 int redisIsSupervised(int mode) {
4025 if (mode == SUPERVISED_AUTODETECT) {
4026 const char *upstart_job = getenv("UPSTART_JOB");
4027 const char *notify_socket = getenv("NOTIFY_SOCKET");
4028
4029 if (upstart_job) {
4030 redisSupervisedUpstart();
4031 } else if (notify_socket) {
4032 redisSupervisedSystemd();
4033 }
4034 } else if (mode == SUPERVISED_UPSTART) {
4035 return redisSupervisedUpstart();
4036 } else if (mode == SUPERVISED_SYSTEMD) {
4037 return redisSupervisedSystemd();
4038 }
4039
4040 return 0;
4041 }
4042
loop(void * arg)4043 static int loop(void *arg) {
4044 aeMain((aeEventLoop *)arg);
4045 return 0;
4046 }
4047
main(int argc,char ** argv)4048 int main(int argc, char **argv) {
4049 struct timeval tv;
4050 int j;
4051
4052 #ifdef HAVE_FF_KQUEUE
4053 int rc = ff_init(argc, argv);
4054 assert(0 == rc);
4055 ff_mod_init();
4056 int new_argc = argc - 4;
4057 if (new_argc <= 0) {
4058 new_argc = 1;
4059 }
4060
4061 char **new_argv = zmalloc(sizeof(char *) * new_argc);
4062 new_argv[0] = argv[0];
4063 int i;
4064 for (i = 1; i < new_argc; i++) {
4065 new_argv[i] = argv[i + 4];
4066 }
4067 argv = new_argv;
4068 argc = new_argc;
4069 #endif
4070
4071
4072 #ifdef REDIS_TEST
4073 if (argc == 3 && !strcasecmp(argv[1], "test")) {
4074 if (!strcasecmp(argv[2], "ziplist")) {
4075 return ziplistTest(argc, argv);
4076 } else if (!strcasecmp(argv[2], "quicklist")) {
4077 quicklistTest(argc, argv);
4078 } else if (!strcasecmp(argv[2], "intset")) {
4079 return intsetTest(argc, argv);
4080 } else if (!strcasecmp(argv[2], "zipmap")) {
4081 return zipmapTest(argc, argv);
4082 } else if (!strcasecmp(argv[2], "sha1test")) {
4083 return sha1Test(argc, argv);
4084 } else if (!strcasecmp(argv[2], "util")) {
4085 return utilTest(argc, argv);
4086 } else if (!strcasecmp(argv[2], "endianconv")) {
4087 return endianconvTest(argc, argv);
4088 } else if (!strcasecmp(argv[2], "crc64")) {
4089 return crc64Test(argc, argv);
4090 } else if (!strcasecmp(argv[2], "zmalloc")) {
4091 return zmalloc_test(argc, argv);
4092 }
4093
4094 return -1; /* test not found */
4095 }
4096 #endif
4097
4098 /* We need to initialize our libraries, and the server configuration. */
4099 #ifdef INIT_SETPROCTITLE_REPLACEMENT
4100 spt_init(argc, argv);
4101 #endif
4102 setlocale(LC_COLLATE,"");
4103 tzset(); /* Populates 'timezone' global. */
4104 zmalloc_set_oom_handler(redisOutOfMemoryHandler);
4105 srand(time(NULL)^getpid());
4106 gettimeofday(&tv,NULL);
4107
4108 char hashseed[16];
4109 getRandomHexChars(hashseed,sizeof(hashseed));
4110 dictSetHashFunctionSeed((uint8_t*)hashseed);
4111 server.sentinel_mode = checkForSentinelMode(argc,argv);
4112 initServerConfig();
4113 moduleInitModulesSystem();
4114
4115 /* Store the executable path and arguments in a safe place in order
4116 * to be able to restart the server later. */
4117 server.executable = getAbsolutePath(argv[0]);
4118 server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
4119 server.exec_argv[argc] = NULL;
4120 for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
4121
4122 /* We need to init sentinel right now as parsing the configuration file
4123 * in sentinel mode will have the effect of populating the sentinel
4124 * data structures with master nodes to monitor. */
4125 if (server.sentinel_mode) {
4126 initSentinelConfig();
4127 initSentinel();
4128 }
4129
4130 /* Check if we need to start in redis-check-rdb/aof mode. We just execute
4131 * the program main. However the program is part of the Redis executable
4132 * so that we can easily execute an RDB check on loading errors. */
4133 if (strstr(argv[0],"redis-check-rdb") != NULL)
4134 redis_check_rdb_main(argc,argv,NULL);
4135 else if (strstr(argv[0],"redis-check-aof") != NULL)
4136 redis_check_aof_main(argc,argv);
4137
4138 if (argc >= 2) {
4139 j = 1; /* First option to parse in argv[] */
4140 sds options = sdsempty();
4141 char *configfile = NULL;
4142
4143 /* Handle special options --help and --version */
4144 if (strcmp(argv[1], "-v") == 0 ||
4145 strcmp(argv[1], "--version") == 0) version();
4146 if (strcmp(argv[1], "--help") == 0 ||
4147 strcmp(argv[1], "-h") == 0) usage();
4148 if (strcmp(argv[1], "--test-memory") == 0) {
4149 if (argc == 3) {
4150 memtest(atoi(argv[2]),50);
4151 exit(0);
4152 } else {
4153 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
4154 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
4155 exit(1);
4156 }
4157 }
4158
4159 /* First argument is the config file name? */
4160 if (argv[j][0] != '-' || argv[j][1] != '-') {
4161 configfile = argv[j];
4162 server.configfile = getAbsolutePath(configfile);
4163 /* Replace the config file in server.exec_argv with
4164 * its absolute path. */
4165 zfree(server.exec_argv[j]);
4166 server.exec_argv[j] = zstrdup(server.configfile);
4167 j++;
4168 }
4169
4170 /* All the other options are parsed and conceptually appended to the
4171 * configuration file. For instance --port 6380 will generate the
4172 * string "port 6380\n" to be parsed after the actual file name
4173 * is parsed, if any. */
4174 while(j != argc) {
4175 if (argv[j][0] == '-' && argv[j][1] == '-') {
4176 /* Option name */
4177 if (!strcmp(argv[j], "--check-rdb")) {
4178 /* Argument has no options, need to skip for parsing. */
4179 j++;
4180 continue;
4181 }
4182 if (sdslen(options)) options = sdscat(options,"\n");
4183 options = sdscat(options,argv[j]+2);
4184 options = sdscat(options," ");
4185 } else {
4186 /* Option argument */
4187 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4188 options = sdscat(options," ");
4189 }
4190 j++;
4191 }
4192 if (server.sentinel_mode && configfile && *configfile == '-') {
4193 serverLog(LL_WARNING,
4194 "Sentinel config from STDIN not allowed.");
4195 serverLog(LL_WARNING,
4196 "Sentinel needs config file on disk to save state. Exiting...");
4197 exit(1);
4198 }
4199 resetServerSaveParams();
4200 loadServerConfig(configfile,options);
4201 sdsfree(options);
4202 }
4203
4204 serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
4205 serverLog(LL_WARNING,
4206 "Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
4207 REDIS_VERSION,
4208 (sizeof(long) == 8) ? 64 : 32,
4209 redisGitSHA1(),
4210 strtol(redisGitDirty(),NULL,10) > 0,
4211 (int)getpid());
4212
4213 if (argc == 1) {
4214 serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4215 } else {
4216 serverLog(LL_WARNING, "Configuration loaded");
4217 }
4218
4219 server.supervised = redisIsSupervised(server.supervised_mode);
4220 int background = server.daemonize && !server.supervised;
4221 if (background) daemonize();
4222
4223 initServer();
4224 if (background || server.pidfile) createPidFile();
4225 redisSetProcTitle(argv[0]);
4226 redisAsciiArt();
4227 checkTcpBacklogSettings();
4228
4229 if (!server.sentinel_mode) {
4230 /* Things not needed when running in Sentinel mode. */
4231 serverLog(LL_WARNING,"Server initialized");
4232 #ifdef __linux__
4233 linuxMemoryWarnings();
4234 #endif
4235 moduleLoadFromQueue();
4236 loadDataFromDisk();
4237 if (server.cluster_enabled) {
4238 if (verifyClusterConfigWithData() == C_ERR) {
4239 serverLog(LL_WARNING,
4240 "You can't have keys in a DB different than DB 0 when in "
4241 "Cluster mode. Exiting.");
4242 exit(1);
4243 }
4244 }
4245 if (server.ipfd_count > 0)
4246 serverLog(LL_NOTICE,"Ready to accept connections");
4247 if (server.sofd > 0)
4248 serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4249 } else {
4250 sentinelIsRunning();
4251 }
4252
4253 /* Warning the user about suspicious maxmemory setting. */
4254 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4255 serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4256 }
4257
4258 aeSetBeforeSleepProc(server.el,beforeSleep);
4259 aeSetAfterSleepProc(server.el,afterSleep);
4260 ff_run(loop, server.el);
4261 aeDeleteEventLoop(server.el);
4262 #ifdef HAVE_FF_KQUEUE
4263 zfree(new_argv);
4264 #endif
4265 return 0;
4266 }
4267
4268 /* The End */
4269