1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "server.h"
31 #include "cluster.h"
32 #include "slowlog.h"
33 #include "bio.h"
34 #include "latency.h"
35
36 #include <time.h>
37 #include <signal.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <assert.h>
41 #include <ctype.h>
42 #include <stdarg.h>
43 #include <arpa/inet.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <sys/time.h>
47 #include <sys/resource.h>
48 #include <sys/uio.h>
49 #include <sys/un.h>
50 #include <limits.h>
51 #include <float.h>
52 #include <math.h>
53 #include <sys/resource.h>
54 #include <sys/utsname.h>
55 #include <locale.h>
56 #include <sys/socket.h>
57
58 /* Our shared "common" objects */
59
60 struct sharedObjectsStruct shared;
61
62 /* Global vars that are actually used as constants. The following double
63 * values are used for double on-disk serialization, and are initialized
64 * at runtime to avoid strange compiler optimizations. */
65
66 double R_Zero, R_PosInf, R_NegInf, R_Nan;
67
68 /*================================= Globals ================================= */
69
70 /* Global vars */
71 struct redisServer server; /* server global state */
72
73 /* Our command table.
74 *
75 * Every entry is composed of the following fields:
76 *
77 * name: a string representing the command name.
78 * function: pointer to the C function implementing the command.
79 * arity: number of arguments, it is possible to use -N to say >= N
80 * sflags: command flags as string. See below for a table of flags.
81 * flags: flags as bitmask. Computed by Redis using the 'sflags' field.
82 * get_keys_proc: an optional function to get key arguments from a command.
83 * This is only used when the following three fields are not
84 * enough to specify what arguments are keys.
85 * first_key_index: first argument that is a key
86 * last_key_index: last argument that is a key
87 * key_step: step to get all the keys from first to last argument. For instance
88 * in MSET the step is two since arguments are key,val,key,val,...
89 * microseconds: microseconds of total execution time for this command.
90 * calls: total number of calls of this command.
91 *
92 * The flags, microseconds and calls fields are computed by Redis and should
93 * always be set to zero.
94 *
95 * Command flags are expressed using strings where every character represents
96 * a flag. Later the populateCommandTable() function will take care of
97 * populating the real 'flags' field using this characters.
98 *
99 * This is the meaning of the flags:
100 *
101 * w: write command (may modify the key space).
102 * r: read command (will never modify the key space).
103 * m: may increase memory usage once called. Don't allow if out of memory.
104 * a: admin command, like SAVE or SHUTDOWN.
105 * p: Pub/Sub related command.
106 * f: force replication of this command, regardless of server.dirty.
107 * s: command not allowed in scripts.
108 * R: random command. Command is not deterministic, that is, the same command
109 * with the same arguments, with the same key space, may have different
110 * results. For instance SPOP and RANDOMKEY are two random commands.
111 * S: Sort command output array if called from script, so that the output
112 * is deterministic.
113 * l: Allow command while loading the database.
114 * t: Allow command while a slave has stale data but is not allowed to
115 * server this data. Normally no command is accepted in this condition
116 * but just a few.
117 * M: Do not automatically propagate the command on MONITOR.
118 * k: Perform an implicit ASKING for this command, so the command will be
119 * accepted in cluster mode if the slot is marked as 'importing'.
120 * F: Fast command: O(1) or O(log(N)) command that should never delay
121 * its execution as long as the kernel scheduler is giving us time.
122 * Note that commands that may trigger a DEL as a side effect (like SET)
123 * are not fast commands.
124 */
125 struct redisCommand redisCommandTable[] = {
126 {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
127 {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
128 {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
129 {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
130 {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
131 {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
132 {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
133 {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
134 {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
135 {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
136 {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
137 {"bitfield",bitfieldCommand,-2,"wm",0,NULL,1,1,1,0,0},
138 {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
139 {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
140 {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
141 {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
142 {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
143 {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
144 {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
145 {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
146 {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
147 {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
148 {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
149 {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
150 {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
151 {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
152 {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
153 {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
154 {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
155 {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
156 {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
157 {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
158 {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
159 {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
160 {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
161 {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
162 {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
163 {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
164 {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
165 {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
166 {"spop",spopCommand,-2,"wRF",0,NULL,1,1,1,0,0},
167 {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
168 {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
169 {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
170 {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
171 {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
172 {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
173 {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
174 {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
175 {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
176 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
177 {"zincrby",zincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
178 {"zrem",zremCommand,-3,"wF",0,NULL,1,1,1,0,0},
179 {"zremrangebyscore",zremrangebyscoreCommand,4,"w",0,NULL,1,1,1,0,0},
180 {"zremrangebyrank",zremrangebyrankCommand,4,"w",0,NULL,1,1,1,0,0},
181 {"zremrangebylex",zremrangebylexCommand,4,"w",0,NULL,1,1,1,0,0},
182 {"zunionstore",zunionstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
183 {"zinterstore",zinterstoreCommand,-4,"wm",0,zunionInterGetKeys,0,0,0,0,0},
184 {"zrange",zrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
185 {"zrangebyscore",zrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
186 {"zrevrangebyscore",zrevrangebyscoreCommand,-4,"r",0,NULL,1,1,1,0,0},
187 {"zrangebylex",zrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
188 {"zrevrangebylex",zrevrangebylexCommand,-4,"r",0,NULL,1,1,1,0,0},
189 {"zcount",zcountCommand,4,"rF",0,NULL,1,1,1,0,0},
190 {"zlexcount",zlexcountCommand,4,"rF",0,NULL,1,1,1,0,0},
191 {"zrevrange",zrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
192 {"zcard",zcardCommand,2,"rF",0,NULL,1,1,1,0,0},
193 {"zscore",zscoreCommand,3,"rF",0,NULL,1,1,1,0,0},
194 {"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
195 {"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
196 {"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
197 {"hset",hsetCommand,4,"wmF",0,NULL,1,1,1,0,0},
198 {"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
199 {"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
200 {"hmset",hmsetCommand,-4,"wm",0,NULL,1,1,1,0,0},
201 {"hmget",hmgetCommand,-3,"r",0,NULL,1,1,1,0,0},
202 {"hincrby",hincrbyCommand,4,"wmF",0,NULL,1,1,1,0,0},
203 {"hincrbyfloat",hincrbyfloatCommand,4,"wmF",0,NULL,1,1,1,0,0},
204 {"hdel",hdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
205 {"hlen",hlenCommand,2,"rF",0,NULL,1,1,1,0,0},
206 {"hstrlen",hstrlenCommand,3,"rF",0,NULL,1,1,1,0,0},
207 {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
208 {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
209 {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
210 {"hexists",hexistsCommand,3,"rF",0,NULL,1,1,1,0,0},
211 {"hscan",hscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
212 {"incrby",incrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
213 {"decrby",decrbyCommand,3,"wmF",0,NULL,1,1,1,0,0},
214 {"incrbyfloat",incrbyfloatCommand,3,"wmF",0,NULL,1,1,1,0,0},
215 {"getset",getsetCommand,3,"wm",0,NULL,1,1,1,0,0},
216 {"mset",msetCommand,-3,"wm",0,NULL,1,-1,2,0,0},
217 {"msetnx",msetnxCommand,-3,"wm",0,NULL,1,-1,2,0,0},
218 {"randomkey",randomkeyCommand,1,"rR",0,NULL,0,0,0,0,0},
219 {"select",selectCommand,2,"lF",0,NULL,0,0,0,0,0},
220 {"move",moveCommand,3,"wF",0,NULL,1,1,1,0,0},
221 {"rename",renameCommand,3,"w",0,NULL,1,2,1,0,0},
222 {"renamenx",renamenxCommand,3,"wF",0,NULL,1,2,1,0,0},
223 {"expire",expireCommand,3,"wF",0,NULL,1,1,1,0,0},
224 {"expireat",expireatCommand,3,"wF",0,NULL,1,1,1,0,0},
225 {"pexpire",pexpireCommand,3,"wF",0,NULL,1,1,1,0,0},
226 {"pexpireat",pexpireatCommand,3,"wF",0,NULL,1,1,1,0,0},
227 {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
228 {"scan",scanCommand,-2,"rR",0,NULL,0,0,0,0,0},
229 {"dbsize",dbsizeCommand,1,"rF",0,NULL,0,0,0,0,0},
230 {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0},
231 {"ping",pingCommand,-1,"tF",0,NULL,0,0,0,0,0},
232 {"echo",echoCommand,2,"F",0,NULL,0,0,0,0,0},
233 {"save",saveCommand,1,"as",0,NULL,0,0,0,0,0},
234 {"bgsave",bgsaveCommand,-1,"a",0,NULL,0,0,0,0,0},
235 {"bgrewriteaof",bgrewriteaofCommand,1,"a",0,NULL,0,0,0,0,0},
236 {"shutdown",shutdownCommand,-1,"alt",0,NULL,0,0,0,0,0},
237 {"lastsave",lastsaveCommand,1,"RF",0,NULL,0,0,0,0,0},
238 {"type",typeCommand,2,"rF",0,NULL,1,1,1,0,0},
239 {"multi",multiCommand,1,"sF",0,NULL,0,0,0,0,0},
240 {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
241 {"discard",discardCommand,1,"sF",0,NULL,0,0,0,0,0},
242 {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
243 {"psync",syncCommand,3,"ars",0,NULL,0,0,0,0,0},
244 {"replconf",replconfCommand,-1,"aslt",0,NULL,0,0,0,0,0},
245 {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
246 {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
247 {"sort",sortCommand,-2,"wm",0,sortGetKeys,1,1,1,0,0},
248 {"info",infoCommand,-1,"lt",0,NULL,0,0,0,0,0},
249 {"monitor",monitorCommand,1,"as",0,NULL,0,0,0,0,0},
250 {"ttl",ttlCommand,2,"rF",0,NULL,1,1,1,0,0},
251 {"touch",touchCommand,-2,"rF",0,NULL,1,1,1,0,0},
252 {"pttl",pttlCommand,2,"rF",0,NULL,1,1,1,0,0},
253 {"persist",persistCommand,2,"wF",0,NULL,1,1,1,0,0},
254 {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
255 {"role",roleCommand,1,"lst",0,NULL,0,0,0,0,0},
256 {"debug",debugCommand,-1,"as",0,NULL,0,0,0,0,0},
257 {"config",configCommand,-2,"lat",0,NULL,0,0,0,0,0},
258 {"subscribe",subscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
259 {"unsubscribe",unsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
260 {"psubscribe",psubscribeCommand,-2,"pslt",0,NULL,0,0,0,0,0},
261 {"punsubscribe",punsubscribeCommand,-1,"pslt",0,NULL,0,0,0,0,0},
262 {"publish",publishCommand,3,"pltF",0,NULL,0,0,0,0,0},
263 {"pubsub",pubsubCommand,-2,"pltR",0,NULL,0,0,0,0,0},
264 {"watch",watchCommand,-2,"sF",0,NULL,1,-1,1,0,0},
265 {"unwatch",unwatchCommand,1,"sF",0,NULL,0,0,0,0,0},
266 {"cluster",clusterCommand,-2,"a",0,NULL,0,0,0,0,0},
267 {"restore",restoreCommand,-4,"wm",0,NULL,1,1,1,0,0},
268 {"restore-asking",restoreCommand,-4,"wmk",0,NULL,1,1,1,0,0},
269 {"migrate",migrateCommand,-6,"w",0,migrateGetKeys,0,0,0,0,0},
270 {"asking",askingCommand,1,"F",0,NULL,0,0,0,0,0},
271 {"readonly",readonlyCommand,1,"F",0,NULL,0,0,0,0,0},
272 {"readwrite",readwriteCommand,1,"F",0,NULL,0,0,0,0,0},
273 {"dump",dumpCommand,2,"r",0,NULL,1,1,1,0,0},
274 {"object",objectCommand,3,"r",0,NULL,2,2,2,0,0},
275 {"client",clientCommand,-2,"as",0,NULL,0,0,0,0,0},
276 {"eval",evalCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
277 {"evalsha",evalShaCommand,-3,"s",0,evalGetKeys,0,0,0,0,0},
278 {"slowlog",slowlogCommand,-2,"a",0,NULL,0,0,0,0,0},
279 {"script",scriptCommand,-2,"s",0,NULL,0,0,0,0,0},
280 {"time",timeCommand,1,"RF",0,NULL,0,0,0,0,0},
281 {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
282 {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0},
283 {"bitpos",bitposCommand,-3,"r",0,NULL,1,1,1,0,0},
284 {"wait",waitCommand,3,"s",0,NULL,0,0,0,0,0},
285 {"command",commandCommand,0,"lt",0,NULL,0,0,0,0,0},
286 {"geoadd",geoaddCommand,-5,"wm",0,NULL,1,1,1,0,0},
287 {"georadius",georadiusCommand,-6,"w",0,NULL,1,1,1,0,0},
288 {"georadiusbymember",georadiusByMemberCommand,-5,"w",0,NULL,1,1,1,0,0},
289 {"geohash",geohashCommand,-2,"r",0,NULL,1,1,1,0,0},
290 {"geopos",geoposCommand,-2,"r",0,NULL,1,1,1,0,0},
291 {"geodist",geodistCommand,-4,"r",0,NULL,1,1,1,0,0},
292 {"pfselftest",pfselftestCommand,1,"a",0,NULL,0,0,0,0,0},
293 {"pfadd",pfaddCommand,-2,"wmF",0,NULL,1,1,1,0,0},
294 {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
295 {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
296 {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
297 {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
298 };
299
300 struct evictionPoolEntry *evictionPoolAlloc(void);
301
302 /*============================ Utility functions ============================ */
303
304 /* Low level logging. To use only for very big messages, otherwise
305 * serverLog() is to prefer. */
serverLogRaw(int level,const char * msg)306 void serverLogRaw(int level, const char *msg) {
307 const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
308 const char *c = ".-*#";
309 FILE *fp;
310 char buf[64];
311 int rawmode = (level & LL_RAW);
312 int log_to_stdout = server.logfile[0] == '\0';
313
314 level &= 0xff; /* clear flags */
315 if (level < server.verbosity) return;
316
317 fp = log_to_stdout ? stdout : fopen(server.logfile,"a");
318 if (!fp) return;
319
320 if (rawmode) {
321 fprintf(fp,"%s",msg);
322 } else {
323 int off;
324 struct timeval tv;
325 int role_char;
326 pid_t pid = getpid();
327
328 gettimeofday(&tv,NULL);
329 off = strftime(buf,sizeof(buf),"%d %b %H:%M:%S.",localtime(&tv.tv_sec));
330 snprintf(buf+off,sizeof(buf)-off,"%03d",(int)tv.tv_usec/1000);
331 if (server.sentinel_mode) {
332 role_char = 'X'; /* Sentinel. */
333 } else if (pid != server.pid) {
334 role_char = 'C'; /* RDB / AOF writing child. */
335 } else {
336 role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
337 }
338 fprintf(fp,"%d:%c %s %c %s\n",
339 (int)getpid(),role_char, buf,c[level],msg);
340 }
341 fflush(fp);
342
343 if (!log_to_stdout) fclose(fp);
344 if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
345 }
346
347 /* Like serverLogRaw() but with printf-alike support. This is the function that
348 * is used across the code. The raw version is only used in order to dump
349 * the INFO output on crash. */
serverLog(int level,const char * fmt,...)350 void serverLog(int level, const char *fmt, ...) {
351 va_list ap;
352 char msg[LOG_MAX_LEN];
353
354 if ((level&0xff) < server.verbosity) return;
355
356 va_start(ap, fmt);
357 vsnprintf(msg, sizeof(msg), fmt, ap);
358 va_end(ap);
359
360 serverLogRaw(level,msg);
361 }
362
363 /* Log a fixed message without printf-alike capabilities, in a way that is
364 * safe to call from a signal handler.
365 *
366 * We actually use this only for signals that are not fatal from the point
367 * of view of Redis. Signals that are going to kill the server anyway and
368 * where we need printf-alike features are served by serverLog(). */
serverLogFromHandler(int level,const char * msg)369 void serverLogFromHandler(int level, const char *msg) {
370 int fd;
371 int log_to_stdout = server.logfile[0] == '\0';
372 char buf[64];
373
374 if ((level&0xff) < server.verbosity || (log_to_stdout && server.daemonize))
375 return;
376 fd = log_to_stdout ? STDOUT_FILENO :
377 open(server.logfile, O_APPEND|O_CREAT|O_WRONLY, 0644);
378 if (fd == -1) return;
379 ll2string(buf,sizeof(buf),getpid());
380 if (write(fd,buf,strlen(buf)) == -1) goto err;
381 if (write(fd,":signal-handler (",17) == -1) goto err;
382 ll2string(buf,sizeof(buf),time(NULL));
383 if (write(fd,buf,strlen(buf)) == -1) goto err;
384 if (write(fd,") ",2) == -1) goto err;
385 if (write(fd,msg,strlen(msg)) == -1) goto err;
386 if (write(fd,"\n",1) == -1) goto err;
387 err:
388 if (!log_to_stdout) close(fd);
389 }
390
391 /* Return the UNIX time in microseconds */
ustime(void)392 long long ustime(void) {
393 struct timeval tv;
394 long long ust;
395
396 gettimeofday(&tv, NULL);
397 ust = ((long long)tv.tv_sec)*1000000;
398 ust += tv.tv_usec;
399 return ust;
400 }
401
402 /* Return the UNIX time in milliseconds */
mstime(void)403 mstime_t mstime(void) {
404 return ustime()/1000;
405 }
406
407 /* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
408 * exit(), because the latter may interact with the same file objects used by
409 * the parent process. However if we are testing the coverage normal exit() is
410 * used in order to obtain the right coverage information. */
exitFromChild(int retcode)411 void exitFromChild(int retcode) {
412 #ifdef COVERAGE_TEST
413 exit(retcode);
414 #else
415 _exit(retcode);
416 #endif
417 }
418
419 /*====================== Hash table type implementation ==================== */
420
421 /* This is a hash table type that uses the SDS dynamic strings library as
422 * keys and redis objects as values (objects can hold SDS strings,
423 * lists, sets). */
424
dictVanillaFree(void * privdata,void * val)425 void dictVanillaFree(void *privdata, void *val)
426 {
427 DICT_NOTUSED(privdata);
428 zfree(val);
429 }
430
dictListDestructor(void * privdata,void * val)431 void dictListDestructor(void *privdata, void *val)
432 {
433 DICT_NOTUSED(privdata);
434 listRelease((list*)val);
435 }
436
dictSdsKeyCompare(void * privdata,const void * key1,const void * key2)437 int dictSdsKeyCompare(void *privdata, const void *key1,
438 const void *key2)
439 {
440 int l1,l2;
441 DICT_NOTUSED(privdata);
442
443 l1 = sdslen((sds)key1);
444 l2 = sdslen((sds)key2);
445 if (l1 != l2) return 0;
446 return memcmp(key1, key2, l1) == 0;
447 }
448
449 /* A case insensitive version used for the command lookup table and other
450 * places where case insensitive non binary-safe comparison is needed. */
dictSdsKeyCaseCompare(void * privdata,const void * key1,const void * key2)451 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
452 const void *key2)
453 {
454 DICT_NOTUSED(privdata);
455
456 return strcasecmp(key1, key2) == 0;
457 }
458
dictObjectDestructor(void * privdata,void * val)459 void dictObjectDestructor(void *privdata, void *val)
460 {
461 DICT_NOTUSED(privdata);
462
463 if (val == NULL) return; /* Values of swapped out keys as set to NULL */
464 decrRefCount(val);
465 }
466
dictSdsDestructor(void * privdata,void * val)467 void dictSdsDestructor(void *privdata, void *val)
468 {
469 DICT_NOTUSED(privdata);
470
471 sdsfree(val);
472 }
473
dictObjKeyCompare(void * privdata,const void * key1,const void * key2)474 int dictObjKeyCompare(void *privdata, const void *key1,
475 const void *key2)
476 {
477 const robj *o1 = key1, *o2 = key2;
478 return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
479 }
480
dictObjHash(const void * key)481 unsigned int dictObjHash(const void *key) {
482 const robj *o = key;
483 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
484 }
485
dictSdsHash(const void * key)486 unsigned int dictSdsHash(const void *key) {
487 return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
488 }
489
dictSdsCaseHash(const void * key)490 unsigned int dictSdsCaseHash(const void *key) {
491 return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
492 }
493
dictEncObjKeyCompare(void * privdata,const void * key1,const void * key2)494 int dictEncObjKeyCompare(void *privdata, const void *key1,
495 const void *key2)
496 {
497 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
498 int cmp;
499
500 if (o1->encoding == OBJ_ENCODING_INT &&
501 o2->encoding == OBJ_ENCODING_INT)
502 return o1->ptr == o2->ptr;
503
504 o1 = getDecodedObject(o1);
505 o2 = getDecodedObject(o2);
506 cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
507 decrRefCount(o1);
508 decrRefCount(o2);
509 return cmp;
510 }
511
dictEncObjHash(const void * key)512 unsigned int dictEncObjHash(const void *key) {
513 robj *o = (robj*) key;
514
515 if (sdsEncodedObject(o)) {
516 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
517 } else {
518 if (o->encoding == OBJ_ENCODING_INT) {
519 char buf[32];
520 int len;
521
522 len = ll2string(buf,32,(long)o->ptr);
523 return dictGenHashFunction((unsigned char*)buf, len);
524 } else {
525 unsigned int hash;
526
527 o = getDecodedObject(o);
528 hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
529 decrRefCount(o);
530 return hash;
531 }
532 }
533 }
534
535 /* Sets type hash table */
536 dictType setDictType = {
537 dictEncObjHash, /* hash function */
538 NULL, /* key dup */
539 NULL, /* val dup */
540 dictEncObjKeyCompare, /* key compare */
541 dictObjectDestructor, /* key destructor */
542 NULL /* val destructor */
543 };
544
545 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */
546 dictType zsetDictType = {
547 dictEncObjHash, /* hash function */
548 NULL, /* key dup */
549 NULL, /* val dup */
550 dictEncObjKeyCompare, /* key compare */
551 dictObjectDestructor, /* key destructor */
552 NULL /* val destructor */
553 };
554
555 /* Db->dict, keys are sds strings, vals are Redis objects. */
556 dictType dbDictType = {
557 dictSdsHash, /* hash function */
558 NULL, /* key dup */
559 NULL, /* val dup */
560 dictSdsKeyCompare, /* key compare */
561 dictSdsDestructor, /* key destructor */
562 dictObjectDestructor /* val destructor */
563 };
564
565 /* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
566 dictType shaScriptObjectDictType = {
567 dictSdsCaseHash, /* hash function */
568 NULL, /* key dup */
569 NULL, /* val dup */
570 dictSdsKeyCaseCompare, /* key compare */
571 dictSdsDestructor, /* key destructor */
572 dictObjectDestructor /* val destructor */
573 };
574
575 /* Db->expires */
576 dictType keyptrDictType = {
577 dictSdsHash, /* hash function */
578 NULL, /* key dup */
579 NULL, /* val dup */
580 dictSdsKeyCompare, /* key compare */
581 NULL, /* key destructor */
582 NULL /* val destructor */
583 };
584
585 /* Command table. sds string -> command struct pointer. */
586 dictType commandTableDictType = {
587 dictSdsCaseHash, /* hash function */
588 NULL, /* key dup */
589 NULL, /* val dup */
590 dictSdsKeyCaseCompare, /* key compare */
591 dictSdsDestructor, /* key destructor */
592 NULL /* val destructor */
593 };
594
595 /* Hash type hash table (note that small hashes are represented with ziplists) */
596 dictType hashDictType = {
597 dictEncObjHash, /* hash function */
598 NULL, /* key dup */
599 NULL, /* val dup */
600 dictEncObjKeyCompare, /* key compare */
601 dictObjectDestructor, /* key destructor */
602 dictObjectDestructor /* val destructor */
603 };
604
605 /* Keylist hash table type has unencoded redis objects as keys and
606 * lists as values. It's used for blocking operations (BLPOP) and to
607 * map swapped keys to a list of clients waiting for this keys to be loaded. */
608 dictType keylistDictType = {
609 dictObjHash, /* hash function */
610 NULL, /* key dup */
611 NULL, /* val dup */
612 dictObjKeyCompare, /* key compare */
613 dictObjectDestructor, /* key destructor */
614 dictListDestructor /* val destructor */
615 };
616
617 /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
618 * clusterNode structures. */
619 dictType clusterNodesDictType = {
620 dictSdsHash, /* hash function */
621 NULL, /* key dup */
622 NULL, /* val dup */
623 dictSdsKeyCompare, /* key compare */
624 dictSdsDestructor, /* key destructor */
625 NULL /* val destructor */
626 };
627
628 /* Cluster re-addition blacklist. This maps node IDs to the time
629 * we can re-add this node. The goal is to avoid readding a removed
630 * node for some time. */
631 dictType clusterNodesBlackListDictType = {
632 dictSdsCaseHash, /* hash function */
633 NULL, /* key dup */
634 NULL, /* val dup */
635 dictSdsKeyCaseCompare, /* key compare */
636 dictSdsDestructor, /* key destructor */
637 NULL /* val destructor */
638 };
639
640 /* Migrate cache dict type. */
641 dictType migrateCacheDictType = {
642 dictSdsHash, /* hash function */
643 NULL, /* key dup */
644 NULL, /* val dup */
645 dictSdsKeyCompare, /* key compare */
646 dictSdsDestructor, /* key destructor */
647 NULL /* val destructor */
648 };
649
650 /* Replication cached script dict (server.repl_scriptcache_dict).
651 * Keys are sds SHA1 strings, while values are not used at all in the current
652 * implementation. */
653 dictType replScriptCacheDictType = {
654 dictSdsCaseHash, /* hash function */
655 NULL, /* key dup */
656 NULL, /* val dup */
657 dictSdsKeyCaseCompare, /* key compare */
658 dictSdsDestructor, /* key destructor */
659 NULL /* val destructor */
660 };
661
htNeedsResize(dict * dict)662 int htNeedsResize(dict *dict) {
663 long long size, used;
664
665 size = dictSlots(dict);
666 used = dictSize(dict);
667 return (size > DICT_HT_INITIAL_SIZE &&
668 (used*100/size < HASHTABLE_MIN_FILL));
669 }
670
671 /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
672 * we resize the hash table to save memory */
tryResizeHashTables(int dbid)673 void tryResizeHashTables(int dbid) {
674 if (htNeedsResize(server.db[dbid].dict))
675 dictResize(server.db[dbid].dict);
676 if (htNeedsResize(server.db[dbid].expires))
677 dictResize(server.db[dbid].expires);
678 }
679
680 /* Our hash table implementation performs rehashing incrementally while
681 * we write/read from the hash table. Still if the server is idle, the hash
682 * table will use two tables for a long time. So we try to use 1 millisecond
683 * of CPU time at every call of this function to perform some rehahsing.
684 *
685 * The function returns 1 if some rehashing was performed, otherwise 0
686 * is returned. */
incrementallyRehash(int dbid)687 int incrementallyRehash(int dbid) {
688 /* Keys dictionary */
689 if (dictIsRehashing(server.db[dbid].dict)) {
690 dictRehashMilliseconds(server.db[dbid].dict,1);
691 return 1; /* already used our millisecond for this loop... */
692 }
693 /* Expires */
694 if (dictIsRehashing(server.db[dbid].expires)) {
695 dictRehashMilliseconds(server.db[dbid].expires,1);
696 return 1; /* already used our millisecond for this loop... */
697 }
698 return 0;
699 }
700
701 /* This function is called once a background process of some kind terminates,
702 * as we want to avoid resizing the hash tables when there is a child in order
703 * to play well with copy-on-write (otherwise when a resize happens lots of
704 * memory pages are copied). The goal of this function is to update the ability
705 * for dict.c to resize the hash tables accordingly to the fact we have o not
706 * running childs. */
updateDictResizePolicy(void)707 void updateDictResizePolicy(void) {
708 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
709 dictEnableResize();
710 else
711 dictDisableResize();
712 }
713
714 /* ======================= Cron: called every 100 ms ======================== */
715
716 /* Helper function for the activeExpireCycle() function.
717 * This function will try to expire the key that is stored in the hash table
718 * entry 'de' of the 'expires' hash table of a Redis database.
719 *
720 * If the key is found to be expired, it is removed from the database and
721 * 1 is returned. Otherwise no operation is performed and 0 is returned.
722 *
723 * When a key is expired, server.stat_expiredkeys is incremented.
724 *
725 * The parameter 'now' is the current time in milliseconds as is passed
726 * to the function to avoid too many gettimeofday() syscalls. */
activeExpireCycleTryExpire(redisDb * db,dictEntry * de,long long now)727 int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
728 long long t = dictGetSignedIntegerVal(de);
729 if (now > t) {
730 sds key = dictGetKey(de);
731 robj *keyobj = createStringObject(key,sdslen(key));
732
733 propagateExpire(db,keyobj);
734 dbDelete(db,keyobj);
735 notifyKeyspaceEvent(NOTIFY_EXPIRED,
736 "expired",keyobj,db->id);
737 decrRefCount(keyobj);
738 server.stat_expiredkeys++;
739 return 1;
740 } else {
741 return 0;
742 }
743 }
744
745 /* Try to expire a few timed out keys. The algorithm used is adaptive and
746 * will use few CPU cycles if there are few expiring keys, otherwise
747 * it will get more aggressive to avoid that too much memory is used by
748 * keys that can be removed from the keyspace.
749 *
750 * No more than CRON_DBS_PER_CALL databases are tested at every
751 * iteration.
752 *
753 * This kind of call is used when Redis detects that timelimit_exit is
754 * true, so there is more work to do, and we do it more incrementally from
755 * the beforeSleep() function of the event loop.
756 *
757 * Expire cycle type:
758 *
759 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
760 * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
761 * microseconds, and is not repeated again before the same amount of time.
762 *
763 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
764 * executed, where the time limit is a percentage of the REDIS_HZ period
765 * as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */
766
activeExpireCycle(int type)767 void activeExpireCycle(int type) {
768 /* This function has some global state in order to continue the work
769 * incrementally across calls. */
770 static unsigned int current_db = 0; /* Last DB tested. */
771 static int timelimit_exit = 0; /* Time limit hit in previous call? */
772 static long long last_fast_cycle = 0; /* When last fast cycle ran. */
773
774 int j, iteration = 0;
775 int dbs_per_call = CRON_DBS_PER_CALL;
776 long long start = ustime(), timelimit;
777
778 if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
779 /* Don't start a fast cycle if the previous cycle did not exited
780 * for time limt. Also don't repeat a fast cycle for the same period
781 * as the fast cycle total duration itself. */
782 if (!timelimit_exit) return;
783 if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
784 last_fast_cycle = start;
785 }
786
787 /* We usually should test CRON_DBS_PER_CALL per iteration, with
788 * two exceptions:
789 *
790 * 1) Don't test more DBs than we have.
791 * 2) If last time we hit the time limit, we want to scan all DBs
792 * in this iteration, as there is work to do in some DB and we don't want
793 * expired keys to use memory for too much time. */
794 if (dbs_per_call > server.dbnum || timelimit_exit)
795 dbs_per_call = server.dbnum;
796
797 /* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
798 * per iteration. Since this function gets called with a frequency of
799 * server.hz times per second, the following is the max amount of
800 * microseconds we can spend in this function. */
801 timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
802 timelimit_exit = 0;
803 if (timelimit <= 0) timelimit = 1;
804
805 if (type == ACTIVE_EXPIRE_CYCLE_FAST)
806 timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
807
808 for (j = 0; j < dbs_per_call; j++) {
809 int expired;
810 redisDb *db = server.db+(current_db % server.dbnum);
811
812 /* Increment the DB now so we are sure if we run out of time
813 * in the current DB we'll restart from the next. This allows to
814 * distribute the time evenly across DBs. */
815 current_db++;
816
817 /* Continue to expire if at the end of the cycle more than 25%
818 * of the keys were expired. */
819 do {
820 unsigned long num, slots;
821 long long now, ttl_sum;
822 int ttl_samples;
823
824 /* If there is nothing to expire try next DB ASAP. */
825 if ((num = dictSize(db->expires)) == 0) {
826 db->avg_ttl = 0;
827 break;
828 }
829 slots = dictSlots(db->expires);
830 now = mstime();
831
832 /* When there are less than 1% filled slots getting random
833 * keys is expensive, so stop here waiting for better times...
834 * The dictionary will be resized asap. */
835 if (num && slots > DICT_HT_INITIAL_SIZE &&
836 (num*100/slots < 1)) break;
837
838 /* The main collection cycle. Sample random keys among keys
839 * with an expire set, checking for expired ones. */
840 expired = 0;
841 ttl_sum = 0;
842 ttl_samples = 0;
843
844 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
845 num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
846
847 while (num--) {
848 dictEntry *de;
849 long long ttl;
850
851 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
852 ttl = dictGetSignedIntegerVal(de)-now;
853 if (activeExpireCycleTryExpire(db,de,now)) expired++;
854 if (ttl > 0) {
855 /* We want the average TTL of keys yet not expired. */
856 ttl_sum += ttl;
857 ttl_samples++;
858 }
859 }
860
861 /* Update the average TTL stats for this database. */
862 if (ttl_samples) {
863 long long avg_ttl = ttl_sum/ttl_samples;
864
865 /* Do a simple running average with a few samples.
866 * We just use the current estimate with a weight of 2%
867 * and the previous estimate with a weight of 98%. */
868 if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
869 db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
870 }
871
872 /* We can't block forever here even if there are many keys to
873 * expire. So after a given amount of milliseconds return to the
874 * caller waiting for the other active expire cycle. */
875 iteration++;
876 if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
877 long long elapsed = ustime()-start;
878
879 latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
880 if (elapsed > timelimit) timelimit_exit = 1;
881 }
882 if (timelimit_exit) return;
883 /* We don't repeat the cycle if there are less than 25% of keys
884 * found expired in the current DB. */
885 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
886 }
887 }
888
getLRUClock(void)889 unsigned int getLRUClock(void) {
890 return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
891 }
892
893 /* Add a sample to the operations per second array of samples. */
trackInstantaneousMetric(int metric,long long current_reading)894 void trackInstantaneousMetric(int metric, long long current_reading) {
895 long long t = mstime() - server.inst_metric[metric].last_sample_time;
896 long long ops = current_reading -
897 server.inst_metric[metric].last_sample_count;
898 long long ops_sec;
899
900 ops_sec = t > 0 ? (ops*1000/t) : 0;
901
902 server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
903 ops_sec;
904 server.inst_metric[metric].idx++;
905 server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
906 server.inst_metric[metric].last_sample_time = mstime();
907 server.inst_metric[metric].last_sample_count = current_reading;
908 }
909
910 /* Return the mean of all the samples. */
getInstantaneousMetric(int metric)911 long long getInstantaneousMetric(int metric) {
912 int j;
913 long long sum = 0;
914
915 for (j = 0; j < STATS_METRIC_SAMPLES; j++)
916 sum += server.inst_metric[metric].samples[j];
917 return sum / STATS_METRIC_SAMPLES;
918 }
919
920 /* Check for timeouts. Returns non-zero if the client was terminated.
921 * The function gets the current time in milliseconds as argument since
922 * it gets called multiple times in a loop, so calling gettimeofday() for
923 * each iteration would be costly without any actual gain. */
clientsCronHandleTimeout(client * c,mstime_t now_ms)924 int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
925 time_t now = now_ms/1000;
926
927 if (server.maxidletime &&
928 !(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
929 !(c->flags & CLIENT_MASTER) && /* no timeout for masters */
930 !(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
931 !(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
932 (now - c->lastinteraction > server.maxidletime))
933 {
934 serverLog(LL_VERBOSE,"Closing idle client");
935 freeClient(c);
936 return 1;
937 } else if (c->flags & CLIENT_BLOCKED) {
938 /* Blocked OPS timeout is handled with milliseconds resolution.
939 * However note that the actual resolution is limited by
940 * server.hz. */
941
942 if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
943 /* Handle blocking operation specific timeout. */
944 replyToBlockedClientTimedOut(c);
945 unblockClient(c);
946 } else if (server.cluster_enabled) {
947 /* Cluster: handle unblock & redirect of clients blocked
948 * into keys no longer served by this server. */
949 if (clusterRedirectBlockedClientIfNeeded(c))
950 unblockClient(c);
951 }
952 }
953 return 0;
954 }
955
956 /* The client query buffer is an sds.c string that can end with a lot of
957 * free space not used, this function reclaims space if needed.
958 *
959 * The function always returns 0 as it never terminates the client. */
clientsCronResizeQueryBuffer(client * c)960 int clientsCronResizeQueryBuffer(client *c) {
961 size_t querybuf_size = sdsAllocSize(c->querybuf);
962 time_t idletime = server.unixtime - c->lastinteraction;
963
964 /* There are two conditions to resize the query buffer:
965 * 1) Query buffer is > BIG_ARG and too big for latest peak.
966 * 2) Client is inactive and the buffer is bigger than 1k. */
967 if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
968 (querybuf_size/(c->querybuf_peak+1)) > 2) ||
969 (querybuf_size > 1024 && idletime > 2))
970 {
971 /* Only resize the query buffer if it is actually wasting space. */
972 if (sdsavail(c->querybuf) > 1024) {
973 c->querybuf = sdsRemoveFreeSpace(c->querybuf);
974 }
975 }
976 /* Reset the peak again to capture the peak memory usage in the next
977 * cycle. */
978 c->querybuf_peak = 0;
979 return 0;
980 }
981
982 #define CLIENTS_CRON_MIN_ITERATIONS 5
clientsCron(void)983 void clientsCron(void) {
984 /* Make sure to process at least numclients/server.hz of clients
985 * per call. Since this function is called server.hz times per second
986 * we are sure that in the worst case we process all the clients in 1
987 * second. */
988 int numclients = listLength(server.clients);
989 int iterations = numclients/server.hz;
990 mstime_t now = mstime();
991
992 /* Process at least a few clients while we are at it, even if we need
993 * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
994 * of processing each client once per second. */
995 if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
996 iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
997 numclients : CLIENTS_CRON_MIN_ITERATIONS;
998
999 while(listLength(server.clients) && iterations--) {
1000 client *c;
1001 listNode *head;
1002
1003 /* Rotate the list, take the current head, process.
1004 * This way if the client must be removed from the list it's the
1005 * first element and we don't incur into O(N) computation. */
1006 listRotate(server.clients);
1007 head = listFirst(server.clients);
1008 c = listNodeValue(head);
1009 /* The following functions do different service checks on the client.
1010 * The protocol is that they return non-zero if the client was
1011 * terminated. */
1012 if (clientsCronHandleTimeout(c,now)) continue;
1013 if (clientsCronResizeQueryBuffer(c)) continue;
1014 }
1015 }
1016
1017 /* This function handles 'background' operations we are required to do
1018 * incrementally in Redis databases, such as active key expiring, resizing,
1019 * rehashing. */
databasesCron(void)1020 void databasesCron(void) {
1021 /* Expire keys by random sampling. Not required for slaves
1022 * as master will synthesize DELs for us. */
1023 if (server.active_expire_enabled && server.masterhost == NULL)
1024 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
1025
1026 /* Perform hash tables rehashing if needed, but only if there are no
1027 * other processes saving the DB on disk. Otherwise rehashing is bad
1028 * as will cause a lot of copy-on-write of memory pages. */
1029 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
1030 /* We use global counters so if we stop the computation at a given
1031 * DB we'll be able to start from the successive in the next
1032 * cron loop iteration. */
1033 static unsigned int resize_db = 0;
1034 static unsigned int rehash_db = 0;
1035 int dbs_per_call = CRON_DBS_PER_CALL;
1036 int j;
1037
1038 /* Don't test more DBs than we have. */
1039 if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
1040
1041 /* Resize */
1042 for (j = 0; j < dbs_per_call; j++) {
1043 tryResizeHashTables(resize_db % server.dbnum);
1044 resize_db++;
1045 }
1046
1047 /* Rehash */
1048 if (server.activerehashing) {
1049 for (j = 0; j < dbs_per_call; j++) {
1050 int work_done = incrementallyRehash(rehash_db % server.dbnum);
1051 rehash_db++;
1052 if (work_done) {
1053 /* If the function did some work, stop here, we'll do
1054 * more at the next cron loop. */
1055 break;
1056 }
1057 }
1058 }
1059 }
1060 }
1061
1062 /* We take a cached value of the unix time in the global state because with
1063 * virtual memory and aging there is to store the current time in objects at
1064 * every object access, and accuracy is not needed. To access a global var is
1065 * a lot faster than calling time(NULL) */
updateCachedTime(void)1066 void updateCachedTime(void) {
1067 server.unixtime = time(NULL);
1068 server.mstime = mstime();
1069 }
1070
1071 /* This is our timer interrupt, called server.hz times per second.
1072 * Here is where we do a number of things that need to be done asynchronously.
1073 * For instance:
1074 *
1075 * - Active expired keys collection (it is also performed in a lazy way on
1076 * lookup).
1077 * - Software watchdog.
1078 * - Update some statistic.
1079 * - Incremental rehashing of the DBs hash tables.
1080 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
1081 * - Clients timeout of different kinds.
1082 * - Replication reconnection.
1083 * - Many more...
1084 *
1085 * Everything directly called here will be called server.hz times per second,
1086 * so in order to throttle execution of things we want to do less frequently
1087 * a macro is used: run_with_period(milliseconds) { .... }
1088 */
1089
serverCron(struct aeEventLoop * eventLoop,long long id,void * clientData)1090 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1091 int j;
1092 UNUSED(eventLoop);
1093 UNUSED(id);
1094 UNUSED(clientData);
1095
1096 /* Software watchdog: deliver the SIGALRM that will reach the signal
1097 * handler if we don't return here fast enough. */
1098 if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
1099
1100 /* Update the time cache. */
1101 updateCachedTime();
1102
1103 run_with_period(100) {
1104 trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
1105 trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
1106 server.stat_net_input_bytes);
1107 trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
1108 server.stat_net_output_bytes);
1109 }
1110
1111 /* We have just LRU_BITS bits per object for LRU information.
1112 * So we use an (eventually wrapping) LRU clock.
1113 *
1114 * Note that even if the counter wraps it's not a big problem,
1115 * everything will still work but some object will appear younger
1116 * to Redis. However for this to happen a given object should never be
1117 * touched for all the time needed to the counter to wrap, which is
1118 * not likely.
1119 *
1120 * Note that you can change the resolution altering the
1121 * LRU_CLOCK_RESOLUTION define. */
1122 server.lruclock = getLRUClock();
1123
1124 /* Record the max memory used since the server was started. */
1125 if (zmalloc_used_memory() > server.stat_peak_memory)
1126 server.stat_peak_memory = zmalloc_used_memory();
1127
1128 /* Sample the RSS here since this is a relatively slow call. */
1129 server.resident_set_size = zmalloc_get_rss();
1130
1131 /* We received a SIGTERM, shutting down here in a safe way, as it is
1132 * not ok doing so inside the signal handler. */
1133 if (server.shutdown_asap) {
1134 if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
1135 serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
1136 server.shutdown_asap = 0;
1137 }
1138
1139 /* Show some info about non-empty databases */
1140 run_with_period(5000) {
1141 for (j = 0; j < server.dbnum; j++) {
1142 long long size, used, vkeys;
1143
1144 size = dictSlots(server.db[j].dict);
1145 used = dictSize(server.db[j].dict);
1146 vkeys = dictSize(server.db[j].expires);
1147 if (used || vkeys) {
1148 serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1149 /* dictPrintStats(server.dict); */
1150 }
1151 }
1152 }
1153
1154 /* Show information about connected clients */
1155 if (!server.sentinel_mode) {
1156 run_with_period(5000) {
1157 serverLog(LL_VERBOSE,
1158 "%lu clients connected (%lu slaves), %zu bytes in use",
1159 listLength(server.clients)-listLength(server.slaves),
1160 listLength(server.slaves),
1161 zmalloc_used_memory());
1162 }
1163 }
1164
1165 /* We need to do a few operations on clients asynchronously. */
1166 clientsCron();
1167
1168 /* Handle background operations on Redis databases. */
1169 databasesCron();
1170
1171 /* Start a scheduled AOF rewrite if this was requested by the user while
1172 * a BGSAVE was in progress. */
1173 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1174 server.aof_rewrite_scheduled)
1175 {
1176 rewriteAppendOnlyFileBackground();
1177 }
1178
1179 /* Check if a background saving or AOF rewrite in progress terminated. */
1180 if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
1181 ldbPendingChildren())
1182 {
1183 int statloc;
1184 pid_t pid;
1185
1186 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1187 int exitcode = WEXITSTATUS(statloc);
1188 int bysignal = 0;
1189
1190 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
1191
1192 if (pid == -1) {
1193 serverLog(LL_WARNING,"wait3() returned an error: %s. "
1194 "rdb_child_pid = %d, aof_child_pid = %d",
1195 strerror(errno),
1196 (int) server.rdb_child_pid,
1197 (int) server.aof_child_pid);
1198 } else if (pid == server.rdb_child_pid) {
1199 backgroundSaveDoneHandler(exitcode,bysignal);
1200 } else if (pid == server.aof_child_pid) {
1201 backgroundRewriteDoneHandler(exitcode,bysignal);
1202 } else {
1203 if (!ldbRemoveChild(pid)) {
1204 serverLog(LL_WARNING,
1205 "Warning, detected child with unmatched pid: %ld",
1206 (long)pid);
1207 }
1208 }
1209 updateDictResizePolicy();
1210 }
1211 } else {
1212 /* If there is not a background saving/rewrite in progress check if
1213 * we have to save/rewrite now */
1214 for (j = 0; j < server.saveparamslen; j++) {
1215 struct saveparam *sp = server.saveparams+j;
1216
1217 /* Save if we reached the given amount of changes,
1218 * the given amount of seconds, and if the latest bgsave was
1219 * successful or if, in case of an error, at least
1220 * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
1221 if (server.dirty >= sp->changes &&
1222 server.unixtime-server.lastsave > sp->seconds &&
1223 (server.unixtime-server.lastbgsave_try >
1224 CONFIG_BGSAVE_RETRY_DELAY ||
1225 server.lastbgsave_status == C_OK))
1226 {
1227 serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
1228 sp->changes, (int)sp->seconds);
1229 rdbSaveBackground(server.rdb_filename);
1230 break;
1231 }
1232 }
1233
1234 /* Trigger an AOF rewrite if needed */
1235 if (server.rdb_child_pid == -1 &&
1236 server.aof_child_pid == -1 &&
1237 server.aof_rewrite_perc &&
1238 server.aof_current_size > server.aof_rewrite_min_size)
1239 {
1240 long long base = server.aof_rewrite_base_size ?
1241 server.aof_rewrite_base_size : 1;
1242 long long growth = (server.aof_current_size*100/base) - 100;
1243 if (growth >= server.aof_rewrite_perc) {
1244 serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
1245 rewriteAppendOnlyFileBackground();
1246 }
1247 }
1248 }
1249
1250
1251 /* AOF postponed flush: Try at every cron cycle if the slow fsync
1252 * completed. */
1253 if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
1254
1255 /* AOF write errors: in this case we have a buffer to flush as well and
1256 * clear the AOF error in case of success to make the DB writable again,
1257 * however to try every second is enough in case of 'hz' is set to
1258 * an higher frequency. */
1259 run_with_period(1000) {
1260 if (server.aof_last_write_status == C_ERR)
1261 flushAppendOnlyFile(0);
1262 }
1263
1264 /* Close clients that need to be closed asynchronous */
1265 freeClientsInAsyncFreeQueue();
1266
1267 /* Clear the paused clients flag if needed. */
1268 clientsArePaused(); /* Don't check return value, just use the side effect. */
1269
1270 /* Replication cron function -- used to reconnect to master,
1271 * detect transfer failures, start background RDB transfers and so forth. */
1272 run_with_period(1000) replicationCron();
1273
1274 /* Run the Redis Cluster cron. */
1275 run_with_period(100) {
1276 if (server.cluster_enabled) clusterCron();
1277 }
1278
1279 /* Run the Sentinel timer if we are in sentinel mode. */
1280 run_with_period(100) {
1281 if (server.sentinel_mode) sentinelTimer();
1282 }
1283
1284 /* Cleanup expired MIGRATE cached sockets. */
1285 run_with_period(1000) {
1286 migrateCloseTimedoutSockets();
1287 }
1288
1289 /* Start a scheduled BGSAVE if the corresponding flag is set. This is
1290 * useful when we are forced to postpone a BGSAVE because an AOF
1291 * rewrite is in progress.
1292 *
1293 * Note: this code must be after the replicationCron() call above so
1294 * make sure when refactoring this file to keep this order. This is useful
1295 * because we want to give priority to RDB savings for replication. */
1296 if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
1297 server.rdb_bgsave_scheduled &&
1298 (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
1299 server.lastbgsave_status == C_OK))
1300 {
1301 if (rdbSaveBackground(server.rdb_filename) == C_OK)
1302 server.rdb_bgsave_scheduled = 0;
1303 }
1304
1305 server.cronloops++;
1306 return 1000/server.hz;
1307 }
1308
1309 /* This function gets called every time Redis is entering the
1310 * main loop of the event driven library, that is, before to sleep
1311 * for ready file descriptors. */
beforeSleep(struct aeEventLoop * eventLoop)1312 void beforeSleep(struct aeEventLoop *eventLoop) {
1313 UNUSED(eventLoop);
1314
1315 /* Call the Redis Cluster before sleep function. Note that this function
1316 * may change the state of Redis Cluster (from ok to fail or vice versa),
1317 * so it's a good idea to call it before serving the unblocked clients
1318 * later in this function. */
1319 if (server.cluster_enabled) clusterBeforeSleep();
1320
1321 /* Run a fast expire cycle (the called function will return
1322 * ASAP if a fast cycle is not needed). */
1323 if (server.active_expire_enabled && server.masterhost == NULL)
1324 activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1325
1326 /* Send all the slaves an ACK request if at least one client blocked
1327 * during the previous event loop iteration. */
1328 if (server.get_ack_from_slaves) {
1329 robj *argv[3];
1330
1331 argv[0] = createStringObject("REPLCONF",8);
1332 argv[1] = createStringObject("GETACK",6);
1333 argv[2] = createStringObject("*",1); /* Not used argument. */
1334 replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1335 decrRefCount(argv[0]);
1336 decrRefCount(argv[1]);
1337 decrRefCount(argv[2]);
1338 server.get_ack_from_slaves = 0;
1339 }
1340
1341 /* Unblock all the clients blocked for synchronous replication
1342 * in WAIT. */
1343 if (listLength(server.clients_waiting_acks))
1344 processClientsWaitingReplicas();
1345
1346 /* Try to process pending commands for clients that were just unblocked. */
1347 if (listLength(server.unblocked_clients))
1348 processUnblockedClients();
1349
1350 /* Write the AOF buffer on disk */
1351 flushAppendOnlyFile(0);
1352
1353 /* Handle writes with pending output buffers. */
1354 handleClientsWithPendingWrites();
1355 }
1356
1357 /* =========================== Server initialization ======================== */
1358
createSharedObjects(void)1359 void createSharedObjects(void) {
1360 int j;
1361
1362 shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
1363 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
1364 shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
1365 shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
1366 shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
1367 shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
1368 shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
1369 shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
1370 shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
1371 shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
1372 shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
1373 shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
1374 shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
1375 shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
1376 "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
1377 shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
1378 "-ERR no such key\r\n"));
1379 shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
1380 "-ERR syntax error\r\n"));
1381 shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
1382 "-ERR source and destination objects are the same\r\n"));
1383 shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
1384 "-ERR index out of range\r\n"));
1385 shared.noscripterr = createObject(OBJ_STRING,sdsnew(
1386 "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
1387 shared.loadingerr = createObject(OBJ_STRING,sdsnew(
1388 "-LOADING Redis is loading the dataset in memory\r\n"));
1389 shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
1390 "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
1391 shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
1392 "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n"));
1393 shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
1394 "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
1395 shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
1396 "-READONLY You can't write against a read only slave.\r\n"));
1397 shared.noautherr = createObject(OBJ_STRING,sdsnew(
1398 "-NOAUTH Authentication required.\r\n"));
1399 shared.oomerr = createObject(OBJ_STRING,sdsnew(
1400 "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
1401 shared.execaborterr = createObject(OBJ_STRING,sdsnew(
1402 "-EXECABORT Transaction discarded because of previous errors.\r\n"));
1403 shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
1404 "-NOREPLICAS Not enough good slaves to write.\r\n"));
1405 shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
1406 "-BUSYKEY Target key name already exists.\r\n"));
1407 shared.space = createObject(OBJ_STRING,sdsnew(" "));
1408 shared.colon = createObject(OBJ_STRING,sdsnew(":"));
1409 shared.plus = createObject(OBJ_STRING,sdsnew("+"));
1410
1411 for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
1412 char dictid_str[64];
1413 int dictid_len;
1414
1415 dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
1416 shared.select[j] = createObject(OBJ_STRING,
1417 sdscatprintf(sdsempty(),
1418 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
1419 dictid_len, dictid_str));
1420 }
1421 shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
1422 shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
1423 shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
1424 shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
1425 shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
1426 shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
1427 shared.del = createStringObject("DEL",3);
1428 shared.rpop = createStringObject("RPOP",4);
1429 shared.lpop = createStringObject("LPOP",4);
1430 shared.lpush = createStringObject("LPUSH",5);
1431 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
1432 shared.integers[j] = createObject(OBJ_STRING,(void*)(long)j);
1433 shared.integers[j]->encoding = OBJ_ENCODING_INT;
1434 }
1435 for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j++) {
1436 shared.mbulkhdr[j] = createObject(OBJ_STRING,
1437 sdscatprintf(sdsempty(),"*%d\r\n",j));
1438 shared.bulkhdr[j] = createObject(OBJ_STRING,
1439 sdscatprintf(sdsempty(),"$%d\r\n",j));
1440 }
1441 /* The following two shared objects, minstring and maxstrings, are not
1442 * actually used for their value but as a special object meaning
1443 * respectively the minimum possible string and the maximum possible
1444 * string in string comparisons for the ZRANGEBYLEX command. */
1445 shared.minstring = createStringObject("minstring",9);
1446 shared.maxstring = createStringObject("maxstring",9);
1447 }
1448
initServerConfig(void)1449 void initServerConfig(void) {
1450 int j;
1451
1452 getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
1453 server.configfile = NULL;
1454 server.executable = NULL;
1455 server.hz = CONFIG_DEFAULT_HZ;
1456 server.runid[CONFIG_RUN_ID_SIZE] = '\0';
1457 server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
1458 server.port = CONFIG_DEFAULT_SERVER_PORT;
1459 server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
1460 server.bindaddr_count = 0;
1461 server.unixsocket = NULL;
1462 server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
1463 server.ipfd_count = 0;
1464 server.sofd = -1;
1465 server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
1466 server.dbnum = CONFIG_DEFAULT_DBNUM;
1467 server.verbosity = CONFIG_DEFAULT_VERBOSITY;
1468 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
1469 server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
1470 server.active_expire_enabled = 1;
1471 server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
1472 server.saveparams = NULL;
1473 server.loading = 0;
1474 server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
1475 server.syslog_enabled = CONFIG_DEFAULT_SYSLOG_ENABLED;
1476 server.syslog_ident = zstrdup(CONFIG_DEFAULT_SYSLOG_IDENT);
1477 server.syslog_facility = LOG_LOCAL0;
1478 server.daemonize = CONFIG_DEFAULT_DAEMONIZE;
1479 server.supervised = 0;
1480 server.supervised_mode = SUPERVISED_NONE;
1481 server.aof_state = AOF_OFF;
1482 server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC;
1483 server.aof_no_fsync_on_rewrite = CONFIG_DEFAULT_AOF_NO_FSYNC_ON_REWRITE;
1484 server.aof_rewrite_perc = AOF_REWRITE_PERC;
1485 server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
1486 server.aof_rewrite_base_size = 0;
1487 server.aof_rewrite_scheduled = 0;
1488 server.aof_last_fsync = time(NULL);
1489 server.aof_rewrite_time_last = -1;
1490 server.aof_rewrite_time_start = -1;
1491 server.aof_lastbgrewrite_status = C_OK;
1492 server.aof_delayed_fsync = 0;
1493 server.aof_fd = -1;
1494 server.aof_selected_db = -1; /* Make sure the first time will not match */
1495 server.aof_flush_postponed_start = 0;
1496 server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC;
1497 server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED;
1498 server.pidfile = NULL;
1499 server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME);
1500 server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME);
1501 server.requirepass = NULL;
1502 server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION;
1503 server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM;
1504 server.stop_writes_on_bgsave_err = CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR;
1505 server.activerehashing = CONFIG_DEFAULT_ACTIVE_REHASHING;
1506 server.notify_keyspace_events = 0;
1507 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS;
1508 server.bpop_blocked_clients = 0;
1509 server.maxmemory = CONFIG_DEFAULT_MAXMEMORY;
1510 server.maxmemory_policy = CONFIG_DEFAULT_MAXMEMORY_POLICY;
1511 server.maxmemory_samples = CONFIG_DEFAULT_MAXMEMORY_SAMPLES;
1512 server.hash_max_ziplist_entries = OBJ_HASH_MAX_ZIPLIST_ENTRIES;
1513 server.hash_max_ziplist_value = OBJ_HASH_MAX_ZIPLIST_VALUE;
1514 server.list_max_ziplist_size = OBJ_LIST_MAX_ZIPLIST_SIZE;
1515 server.list_compress_depth = OBJ_LIST_COMPRESS_DEPTH;
1516 server.set_max_intset_entries = OBJ_SET_MAX_INTSET_ENTRIES;
1517 server.zset_max_ziplist_entries = OBJ_ZSET_MAX_ZIPLIST_ENTRIES;
1518 server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
1519 server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
1520 server.shutdown_asap = 0;
1521 server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
1522 server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
1523 server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
1524 server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
1525 server.cluster_enabled = 0;
1526 server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
1527 server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
1528 server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
1529 server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
1530 server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
1531 server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
1532 server.next_client_id = 1; /* Client IDs, start from 1 .*/
1533 server.loading_process_events_interval_bytes = (1024*1024*2);
1534
1535 server.lruclock = getLRUClock();
1536 resetServerSaveParams();
1537
1538 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1539 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1540 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1541 /* Replication related */
1542 server.masterauth = NULL;
1543 server.masterhost = NULL;
1544 server.masterport = 6379;
1545 server.master = NULL;
1546 server.cached_master = NULL;
1547 server.repl_master_initial_offset = -1;
1548 server.repl_state = REPL_STATE_NONE;
1549 server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
1550 server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
1551 server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
1552 server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
1553 server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
1554 server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
1555 server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
1556 server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
1557 server.slave_announce_ip = CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP;
1558 server.slave_announce_port = CONFIG_DEFAULT_SLAVE_ANNOUNCE_PORT;
1559 server.master_repl_offset = 0;
1560
1561 /* Replication partial resync backlog */
1562 server.repl_backlog = NULL;
1563 server.repl_backlog_size = CONFIG_DEFAULT_REPL_BACKLOG_SIZE;
1564 server.repl_backlog_histlen = 0;
1565 server.repl_backlog_idx = 0;
1566 server.repl_backlog_off = 0;
1567 server.repl_backlog_time_limit = CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT;
1568 server.repl_no_slaves_since = time(NULL);
1569
1570 /* Client output buffer limits */
1571 for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
1572 server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
1573
1574 /* Double constants initialization */
1575 R_Zero = 0.0;
1576 R_PosInf = 1.0/R_Zero;
1577 R_NegInf = -1.0/R_Zero;
1578 R_Nan = R_Zero/R_Zero;
1579
1580 /* Command table -- we initiialize it here as it is part of the
1581 * initial configuration, since command names may be changed via
1582 * redis.conf using the rename-command directive. */
1583 server.commands = dictCreate(&commandTableDictType,NULL);
1584 server.orig_commands = dictCreate(&commandTableDictType,NULL);
1585 populateCommandTable();
1586 server.delCommand = lookupCommandByCString("del");
1587 server.multiCommand = lookupCommandByCString("multi");
1588 server.lpushCommand = lookupCommandByCString("lpush");
1589 server.lpopCommand = lookupCommandByCString("lpop");
1590 server.rpopCommand = lookupCommandByCString("rpop");
1591 server.sremCommand = lookupCommandByCString("srem");
1592 server.execCommand = lookupCommandByCString("exec");
1593
1594 /* Slow log */
1595 server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
1596 server.slowlog_max_len = CONFIG_DEFAULT_SLOWLOG_MAX_LEN;
1597
1598 /* Latency monitor */
1599 server.latency_monitor_threshold = CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD;
1600
1601 /* Debugging */
1602 server.assert_failed = "<no assertion failed>";
1603 server.assert_file = "<no file>";
1604 server.assert_line = 0;
1605 server.bug_report_start = 0;
1606 server.watchdog_period = 0;
1607 }
1608
1609 extern char **environ;
1610
1611 /* Restart the server, executing the same executable that started this
1612 * instance, with the same arguments and configuration file.
1613 *
1614 * The function is designed to directly call execve() so that the new
1615 * server instance will retain the PID of the previous one.
1616 *
1617 * The list of flags, that may be bitwise ORed together, alter the
1618 * behavior of this function:
1619 *
1620 * RESTART_SERVER_NONE No flags.
1621 * RESTART_SERVER_GRACEFULLY Do a proper shutdown before restarting.
1622 * RESTART_SERVER_CONFIG_REWRITE Rewrite the config file before restarting.
1623 *
1624 * On success the function does not return, because the process turns into
1625 * a different process. On error C_ERR is returned. */
restartServer(int flags,mstime_t delay)1626 int restartServer(int flags, mstime_t delay) {
1627 int j;
1628
1629 /* Check if we still have accesses to the executable that started this
1630 * server instance. */
1631 if (access(server.executable,X_OK) == -1) return C_ERR;
1632
1633 /* Config rewriting. */
1634 if (flags & RESTART_SERVER_CONFIG_REWRITE &&
1635 server.configfile &&
1636 rewriteConfig(server.configfile) == -1) return C_ERR;
1637
1638 /* Perform a proper shutdown. */
1639 if (flags & RESTART_SERVER_GRACEFULLY &&
1640 prepareForShutdown(SHUTDOWN_NOFLAGS) != C_OK) return C_ERR;
1641
1642 /* Close all file descriptors, with the exception of stdin, stdout, strerr
1643 * which are useful if we restart a Redis server which is not daemonized. */
1644 for (j = 3; j < (int)server.maxclients + 1024; j++) close(j);
1645
1646 /* Execute the server with the original command line. */
1647 if (delay) usleep(delay*1000);
1648 execve(server.executable,server.exec_argv,environ);
1649
1650 /* If an error occurred here, there is nothing we can do, but exit. */
1651 _exit(1);
1652
1653 return C_ERR; /* Never reached. */
1654 }
1655
1656 /* This function will try to raise the max number of open files accordingly to
1657 * the configured max number of clients. It also reserves a number of file
1658 * descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
1659 * persistence, listening sockets, log files and so forth.
1660 *
1661 * If it will not be possible to set the limit accordingly to the configured
1662 * max number of clients, the function will do the reverse setting
1663 * server.maxclients to the value that we can actually handle. */
adjustOpenFilesLimit(void)1664 void adjustOpenFilesLimit(void) {
1665 rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
1666 struct rlimit limit;
1667
1668 if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
1669 serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
1670 strerror(errno));
1671 server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
1672 } else {
1673 rlim_t oldlimit = limit.rlim_cur;
1674
1675 /* Set the max number of files if the current limit is not enough
1676 * for our needs. */
1677 if (oldlimit < maxfiles) {
1678 rlim_t bestlimit;
1679 int setrlimit_error = 0;
1680
1681 /* Try to set the file limit to match 'maxfiles' or at least
1682 * to the higher value supported less than maxfiles. */
1683 bestlimit = maxfiles;
1684 while(bestlimit > oldlimit) {
1685 rlim_t decr_step = 16;
1686
1687 limit.rlim_cur = bestlimit;
1688 limit.rlim_max = bestlimit;
1689 if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
1690 setrlimit_error = errno;
1691
1692 /* We failed to set file limit to 'bestlimit'. Try with a
1693 * smaller limit decrementing by a few FDs per iteration. */
1694 if (bestlimit < decr_step) break;
1695 bestlimit -= decr_step;
1696 }
1697
1698 /* Assume that the limit we get initially is still valid if
1699 * our last try was even lower. */
1700 if (bestlimit < oldlimit) bestlimit = oldlimit;
1701
1702 if (bestlimit < maxfiles) {
1703 int old_maxclients = server.maxclients;
1704 server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
1705 if (server.maxclients < 1) {
1706 serverLog(LL_WARNING,"Your current 'ulimit -n' "
1707 "of %llu is not enough for the server to start. "
1708 "Please increase your open file limit to at least "
1709 "%llu. Exiting.",
1710 (unsigned long long) oldlimit,
1711 (unsigned long long) maxfiles);
1712 exit(1);
1713 }
1714 serverLog(LL_WARNING,"You requested maxclients of %d "
1715 "requiring at least %llu max file descriptors.",
1716 old_maxclients,
1717 (unsigned long long) maxfiles);
1718 serverLog(LL_WARNING,"Server can't set maximum open files "
1719 "to %llu because of OS error: %s.",
1720 (unsigned long long) maxfiles, strerror(setrlimit_error));
1721 serverLog(LL_WARNING,"Current maximum open files is %llu. "
1722 "maxclients has been reduced to %d to compensate for "
1723 "low ulimit. "
1724 "If you need higher maxclients increase 'ulimit -n'.",
1725 (unsigned long long) bestlimit, server.maxclients);
1726 } else {
1727 serverLog(LL_NOTICE,"Increased maximum number of open files "
1728 "to %llu (it was originally set to %llu).",
1729 (unsigned long long) maxfiles,
1730 (unsigned long long) oldlimit);
1731 }
1732 }
1733 }
1734 }
1735
1736 /* Check that server.tcp_backlog can be actually enforced in Linux according
1737 * to the value of /proc/sys/net/core/somaxconn, or warn about it. */
checkTcpBacklogSettings(void)1738 void checkTcpBacklogSettings(void) {
1739 #ifdef HAVE_PROC_SOMAXCONN
1740 FILE *fp = fopen("/proc/sys/net/core/somaxconn","r");
1741 char buf[1024];
1742 if (!fp) return;
1743 if (fgets(buf,sizeof(buf),fp) != NULL) {
1744 int somaxconn = atoi(buf);
1745 if (somaxconn > 0 && somaxconn < server.tcp_backlog) {
1746 serverLog(LL_WARNING,"WARNING: The TCP backlog setting of %d cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of %d.", server.tcp_backlog, somaxconn);
1747 }
1748 }
1749 fclose(fp);
1750 #endif
1751 }
1752
1753 /* Initialize a set of file descriptors to listen to the specified 'port'
1754 * binding the addresses specified in the Redis server configuration.
1755 *
1756 * The listening file descriptors are stored in the integer array 'fds'
1757 * and their number is set in '*count'.
1758 *
1759 * The addresses to bind are specified in the global server.bindaddr array
1760 * and their number is server.bindaddr_count. If the server configuration
1761 * contains no specific addresses to bind, this function will try to
1762 * bind * (all addresses) for both the IPv4 and IPv6 protocols.
1763 *
1764 * On success the function returns C_OK.
1765 *
1766 * On error the function returns C_ERR. For the function to be on
1767 * error, at least one of the server.bindaddr addresses was
1768 * impossible to bind, or no bind addresses were specified in the server
1769 * configuration but the function is not able to bind * for at least
1770 * one of the IPv4 or IPv6 protocols. */
listenToPort(int port,int * fds,int * count)1771 int listenToPort(int port, int *fds, int *count) {
1772 int j;
1773
1774 /* Force binding of 0.0.0.0 if no bind address is specified, always
1775 * entering the loop if j == 0. */
1776 if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
1777 for (j = 0; j < server.bindaddr_count || j == 0; j++) {
1778 if (server.bindaddr[j] == NULL) {
1779 /* Bind * for both IPv6 and IPv4, we enter here only if
1780 * server.bindaddr_count == 0. */
1781 fds[*count] = anetTcp6Server(server.neterr,port,NULL,
1782 server.tcp_backlog);
1783 if (fds[*count] != ANET_ERR) {
1784 anetNonBlock(NULL,fds[*count]);
1785 (*count)++;
1786
1787 /* Bind the IPv4 address as well. */
1788 fds[*count] = anetTcpServer(server.neterr,port,NULL,
1789 server.tcp_backlog);
1790 if (fds[*count] != ANET_ERR) {
1791 anetNonBlock(NULL,fds[*count]);
1792 (*count)++;
1793 }
1794 }
1795 /* Exit the loop if we were able to bind * on IPv4 and IPv6,
1796 * otherwise fds[*count] will be ANET_ERR and we'll print an
1797 * error and return to the caller with an error. */
1798 if (*count == 2) break;
1799 } else if (strchr(server.bindaddr[j],':')) {
1800 /* Bind IPv6 address. */
1801 fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
1802 server.tcp_backlog);
1803 } else {
1804 /* Bind IPv4 address. */
1805 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
1806 server.tcp_backlog);
1807 }
1808 if (fds[*count] == ANET_ERR) {
1809 serverLog(LL_WARNING,
1810 "Creating Server TCP listening socket %s:%d: %s",
1811 server.bindaddr[j] ? server.bindaddr[j] : "*",
1812 port, server.neterr);
1813 return C_ERR;
1814 }
1815 anetNonBlock(NULL,fds[*count]);
1816 (*count)++;
1817 }
1818 return C_OK;
1819 }
1820
1821 /* Resets the stats that we expose via INFO or other means that we want
1822 * to reset via CONFIG RESETSTAT. The function is also used in order to
1823 * initialize these fields in initServer() at server startup. */
resetServerStats(void)1824 void resetServerStats(void) {
1825 int j;
1826
1827 server.stat_numcommands = 0;
1828 server.stat_numconnections = 0;
1829 server.stat_expiredkeys = 0;
1830 server.stat_evictedkeys = 0;
1831 server.stat_keyspace_misses = 0;
1832 server.stat_keyspace_hits = 0;
1833 server.stat_fork_time = 0;
1834 server.stat_fork_rate = 0;
1835 server.stat_rejected_conn = 0;
1836 server.stat_sync_full = 0;
1837 server.stat_sync_partial_ok = 0;
1838 server.stat_sync_partial_err = 0;
1839 for (j = 0; j < STATS_METRIC_COUNT; j++) {
1840 server.inst_metric[j].idx = 0;
1841 server.inst_metric[j].last_sample_time = mstime();
1842 server.inst_metric[j].last_sample_count = 0;
1843 memset(server.inst_metric[j].samples,0,
1844 sizeof(server.inst_metric[j].samples));
1845 }
1846 server.stat_net_input_bytes = 0;
1847 server.stat_net_output_bytes = 0;
1848 server.aof_delayed_fsync = 0;
1849 }
1850
initServer(void)1851 void initServer(void) {
1852 int j;
1853
1854 signal(SIGHUP, SIG_IGN);
1855 signal(SIGPIPE, SIG_IGN);
1856 setupSignalHandlers();
1857
1858 if (server.syslog_enabled) {
1859 openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
1860 server.syslog_facility);
1861 }
1862
1863 server.pid = getpid();
1864 server.current_client = NULL;
1865 server.clients = listCreate();
1866 server.clients_to_close = listCreate();
1867 server.slaves = listCreate();
1868 server.monitors = listCreate();
1869 server.clients_pending_write = listCreate();
1870 server.slaveseldb = -1; /* Force to emit the first SELECT command. */
1871 server.unblocked_clients = listCreate();
1872 server.ready_keys = listCreate();
1873 server.clients_waiting_acks = listCreate();
1874 server.get_ack_from_slaves = 0;
1875 server.clients_paused = 0;
1876 server.system_memory_size = zmalloc_get_memory_size();
1877
1878 createSharedObjects();
1879 adjustOpenFilesLimit();
1880 server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
1881 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1882
1883 /* Open the TCP listening socket for the user commands. */
1884 if (server.port != 0 &&
1885 listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
1886 exit(1);
1887
1888 /* Open the listening Unix domain socket. */
1889 if (server.unixsocket != NULL) {
1890 unlink(server.unixsocket); /* don't care if this fails */
1891 server.sofd = anetUnixServer(server.neterr,server.unixsocket,
1892 server.unixsocketperm, server.tcp_backlog);
1893 if (server.sofd == ANET_ERR) {
1894 serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
1895 exit(1);
1896 }
1897 anetNonBlock(NULL,server.sofd);
1898 }
1899
1900 /* Abort if there are no listening sockets at all. */
1901 if (server.ipfd_count == 0 && server.sofd < 0) {
1902 serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
1903 exit(1);
1904 }
1905
1906 /* Create the Redis databases, and initialize other internal state. */
1907 for (j = 0; j < server.dbnum; j++) {
1908 server.db[j].dict = dictCreate(&dbDictType,NULL);
1909 server.db[j].expires = dictCreate(&keyptrDictType,NULL);
1910 server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
1911 server.db[j].ready_keys = dictCreate(&setDictType,NULL);
1912 server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
1913 server.db[j].eviction_pool = evictionPoolAlloc();
1914 server.db[j].id = j;
1915 server.db[j].avg_ttl = 0;
1916 }
1917 server.pubsub_channels = dictCreate(&keylistDictType,NULL);
1918 server.pubsub_patterns = listCreate();
1919 listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
1920 listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
1921 server.cronloops = 0;
1922 server.rdb_child_pid = -1;
1923 server.aof_child_pid = -1;
1924 server.rdb_child_type = RDB_CHILD_TYPE_NONE;
1925 server.rdb_bgsave_scheduled = 0;
1926 aofRewriteBufferReset();
1927 server.aof_buf = sdsempty();
1928 server.lastsave = time(NULL); /* At startup we consider the DB saved. */
1929 server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
1930 server.rdb_save_time_last = -1;
1931 server.rdb_save_time_start = -1;
1932 server.dirty = 0;
1933 resetServerStats();
1934 /* A few stats we don't want to reset: server startup time, and peak mem. */
1935 server.stat_starttime = time(NULL);
1936 server.stat_peak_memory = 0;
1937 server.resident_set_size = 0;
1938 server.lastbgsave_status = C_OK;
1939 server.aof_last_write_status = C_OK;
1940 server.aof_last_write_errno = 0;
1941 server.repl_good_slaves_count = 0;
1942 updateCachedTime();
1943
1944 /* Create the serverCron() time event, that's our main way to process
1945 * background operations. */
1946 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
1947 serverPanic("Can't create the serverCron time event.");
1948 exit(1);
1949 }
1950
1951 /* Create an event handler for accepting new connections in TCP and Unix
1952 * domain sockets. */
1953 for (j = 0; j < server.ipfd_count; j++) {
1954 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
1955 acceptTcpHandler,NULL) == AE_ERR)
1956 {
1957 serverPanic(
1958 "Unrecoverable error creating server.ipfd file event.");
1959 }
1960 }
1961 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
1962 acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
1963
1964 /* Open the AOF file if needed. */
1965 if (server.aof_state == AOF_ON) {
1966 server.aof_fd = open(server.aof_filename,
1967 O_WRONLY|O_APPEND|O_CREAT,0644);
1968 if (server.aof_fd == -1) {
1969 serverLog(LL_WARNING, "Can't open the append-only file: %s",
1970 strerror(errno));
1971 exit(1);
1972 }
1973 }
1974
1975 /* 32 bit instances are limited to 4GB of address space, so if there is
1976 * no explicit limit in the user provided configuration we set a limit
1977 * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
1978 * useless crashes of the Redis instance for out of memory. */
1979 if (server.arch_bits == 32 && server.maxmemory == 0) {
1980 serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
1981 server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
1982 server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
1983 }
1984
1985 if (server.cluster_enabled) clusterInit();
1986 replicationScriptCacheInit();
1987 scriptingInit(1);
1988 slowlogInit();
1989 latencyMonitorInit();
1990 bioInit();
1991 }
1992
1993 /* Populates the Redis Command Table starting from the hard coded list
1994 * we have on top of redis.c file. */
populateCommandTable(void)1995 void populateCommandTable(void) {
1996 int j;
1997 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
1998
1999 for (j = 0; j < numcommands; j++) {
2000 struct redisCommand *c = redisCommandTable+j;
2001 char *f = c->sflags;
2002 int retval1, retval2;
2003
2004 while(*f != '\0') {
2005 switch(*f) {
2006 case 'w': c->flags |= CMD_WRITE; break;
2007 case 'r': c->flags |= CMD_READONLY; break;
2008 case 'm': c->flags |= CMD_DENYOOM; break;
2009 case 'a': c->flags |= CMD_ADMIN; break;
2010 case 'p': c->flags |= CMD_PUBSUB; break;
2011 case 's': c->flags |= CMD_NOSCRIPT; break;
2012 case 'R': c->flags |= CMD_RANDOM; break;
2013 case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
2014 case 'l': c->flags |= CMD_LOADING; break;
2015 case 't': c->flags |= CMD_STALE; break;
2016 case 'M': c->flags |= CMD_SKIP_MONITOR; break;
2017 case 'k': c->flags |= CMD_ASKING; break;
2018 case 'F': c->flags |= CMD_FAST; break;
2019 default: serverPanic("Unsupported command flag"); break;
2020 }
2021 f++;
2022 }
2023
2024 retval1 = dictAdd(server.commands, sdsnew(c->name), c);
2025 /* Populate an additional dictionary that will be unaffected
2026 * by rename-command statements in redis.conf. */
2027 retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
2028 serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
2029 }
2030 }
2031
resetCommandTableStats(void)2032 void resetCommandTableStats(void) {
2033 int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
2034 int j;
2035
2036 for (j = 0; j < numcommands; j++) {
2037 struct redisCommand *c = redisCommandTable+j;
2038
2039 c->microseconds = 0;
2040 c->calls = 0;
2041 }
2042 }
2043
2044 /* ========================== Redis OP Array API ============================ */
2045
redisOpArrayInit(redisOpArray * oa)2046 void redisOpArrayInit(redisOpArray *oa) {
2047 oa->ops = NULL;
2048 oa->numops = 0;
2049 }
2050
redisOpArrayAppend(redisOpArray * oa,struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2051 int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
2052 robj **argv, int argc, int target)
2053 {
2054 redisOp *op;
2055
2056 oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
2057 op = oa->ops+oa->numops;
2058 op->cmd = cmd;
2059 op->dbid = dbid;
2060 op->argv = argv;
2061 op->argc = argc;
2062 op->target = target;
2063 oa->numops++;
2064 return oa->numops;
2065 }
2066
redisOpArrayFree(redisOpArray * oa)2067 void redisOpArrayFree(redisOpArray *oa) {
2068 while(oa->numops) {
2069 int j;
2070 redisOp *op;
2071
2072 oa->numops--;
2073 op = oa->ops+oa->numops;
2074 for (j = 0; j < op->argc; j++)
2075 decrRefCount(op->argv[j]);
2076 zfree(op->argv);
2077 }
2078 zfree(oa->ops);
2079 }
2080
2081 /* ====================== Commands lookup and execution ===================== */
2082
lookupCommand(sds name)2083 struct redisCommand *lookupCommand(sds name) {
2084 return dictFetchValue(server.commands, name);
2085 }
2086
lookupCommandByCString(char * s)2087 struct redisCommand *lookupCommandByCString(char *s) {
2088 struct redisCommand *cmd;
2089 sds name = sdsnew(s);
2090
2091 cmd = dictFetchValue(server.commands, name);
2092 sdsfree(name);
2093 return cmd;
2094 }
2095
2096 /* Lookup the command in the current table, if not found also check in
2097 * the original table containing the original command names unaffected by
2098 * redis.conf rename-command statement.
2099 *
2100 * This is used by functions rewriting the argument vector such as
2101 * rewriteClientCommandVector() in order to set client->cmd pointer
2102 * correctly even if the command was renamed. */
lookupCommandOrOriginal(sds name)2103 struct redisCommand *lookupCommandOrOriginal(sds name) {
2104 struct redisCommand *cmd = dictFetchValue(server.commands, name);
2105
2106 if (!cmd) cmd = dictFetchValue(server.orig_commands,name);
2107 return cmd;
2108 }
2109
2110 /* Propagate the specified command (in the context of the specified database id)
2111 * to AOF and Slaves.
2112 *
2113 * flags are an xor between:
2114 * + PROPAGATE_NONE (no propagation of command at all)
2115 * + PROPAGATE_AOF (propagate into the AOF file if is enabled)
2116 * + PROPAGATE_REPL (propagate into the replication link)
2117 *
2118 * This should not be used inside commands implementation. Use instead
2119 * alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().
2120 */
propagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int flags)2121 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2122 int flags)
2123 {
2124 if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
2125 feedAppendOnlyFile(cmd,dbid,argv,argc);
2126 if (flags & PROPAGATE_REPL)
2127 replicationFeedSlaves(server.slaves,dbid,argv,argc);
2128 }
2129
2130 /* Used inside commands to schedule the propagation of additional commands
2131 * after the current command is propagated to AOF / Replication.
2132 *
2133 * 'cmd' must be a pointer to the Redis command to replicate, dbid is the
2134 * database ID the command should be propagated into.
2135 * Arguments of the command to propagte are passed as an array of redis
2136 * objects pointers of len 'argc', using the 'argv' vector.
2137 *
2138 * The function does not take a reference to the passed 'argv' vector,
2139 * so it is up to the caller to release the passed argv (but it is usually
2140 * stack allocated). The function autoamtically increments ref count of
2141 * passed objects, so the caller does not need to. */
alsoPropagate(struct redisCommand * cmd,int dbid,robj ** argv,int argc,int target)2142 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
2143 int target)
2144 {
2145 robj **argvcopy;
2146 int j;
2147
2148 if (server.loading) return; /* No propagation during loading. */
2149
2150 argvcopy = zmalloc(sizeof(robj*)*argc);
2151 for (j = 0; j < argc; j++) {
2152 argvcopy[j] = argv[j];
2153 incrRefCount(argv[j]);
2154 }
2155 redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
2156 }
2157
2158 /* It is possible to call the function forceCommandPropagation() inside a
2159 * Redis command implementation in order to to force the propagation of a
2160 * specific command execution into AOF / Replication. */
forceCommandPropagation(client * c,int flags)2161 void forceCommandPropagation(client *c, int flags) {
2162 if (flags & PROPAGATE_REPL) c->flags |= CLIENT_FORCE_REPL;
2163 if (flags & PROPAGATE_AOF) c->flags |= CLIENT_FORCE_AOF;
2164 }
2165
2166 /* Avoid that the executed command is propagated at all. This way we
2167 * are free to just propagate what we want using the alsoPropagate()
2168 * API. */
preventCommandPropagation(client * c)2169 void preventCommandPropagation(client *c) {
2170 c->flags |= CLIENT_PREVENT_PROP;
2171 }
2172
2173 /* AOF specific version of preventCommandPropagation(). */
preventCommandAOF(client * c)2174 void preventCommandAOF(client *c) {
2175 c->flags |= CLIENT_PREVENT_AOF_PROP;
2176 }
2177
2178 /* Replication specific version of preventCommandPropagation(). */
preventCommandReplication(client * c)2179 void preventCommandReplication(client *c) {
2180 c->flags |= CLIENT_PREVENT_REPL_PROP;
2181 }
2182
2183 /* Call() is the core of Redis execution of a command.
2184 *
2185 * The following flags can be passed:
2186 * CMD_CALL_NONE No flags.
2187 * CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.
2188 * CMD_CALL_STATS Populate command stats.
2189 * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
2190 * or if the client flags are forcing propagation.
2191 * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset
2192 * or if the client flags are forcing propagation.
2193 * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.
2194 * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.
2195 *
2196 * The exact propagation behavior depends on the client flags.
2197 * Specifically:
2198 *
2199 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
2200 * and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
2201 * in the call flags, then the command is propagated even if the
2202 * dataset was not affected by the command.
2203 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
2204 * are set, the propagation into AOF or to slaves is not performed even
2205 * if the command modified the dataset.
2206 *
2207 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
2208 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
2209 * slaves propagation will never occur.
2210 *
2211 * Client flags are modified by the implementation of a given command
2212 * using the following API:
2213 *
2214 * forceCommandPropagation(client *c, int flags);
2215 * preventCommandPropagation(client *c);
2216 * preventCommandAOF(client *c);
2217 * preventCommandReplication(client *c);
2218 *
2219 */
call(client * c,int flags)2220 void call(client *c, int flags) {
2221 long long dirty, start, duration;
2222 int client_old_flags = c->flags;
2223
2224 /* Sent the command to clients in MONITOR mode, only if the commands are
2225 * not generated from reading an AOF. */
2226 if (listLength(server.monitors) &&
2227 !server.loading &&
2228 !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2229 {
2230 replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2231 }
2232
2233 /* Initialization: clear the flags that must be set by the command on
2234 * demand, and initialize the array for additional commands propagation. */
2235 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2236 redisOpArrayInit(&server.also_propagate);
2237
2238 /* Call the command. */
2239 dirty = server.dirty;
2240 start = ustime();
2241 c->cmd->proc(c);
2242 duration = ustime()-start;
2243 dirty = server.dirty-dirty;
2244 if (dirty < 0) dirty = 0;
2245
2246 /* When EVAL is called loading the AOF we don't want commands called
2247 * from Lua to go into the slowlog or to populate statistics. */
2248 if (server.loading && c->flags & CLIENT_LUA)
2249 flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2250
2251 /* If the caller is Lua, we want to force the EVAL caller to propagate
2252 * the script if the command flag or client flag are forcing the
2253 * propagation. */
2254 if (c->flags & CLIENT_LUA && server.lua_caller) {
2255 if (c->flags & CLIENT_FORCE_REPL)
2256 server.lua_caller->flags |= CLIENT_FORCE_REPL;
2257 if (c->flags & CLIENT_FORCE_AOF)
2258 server.lua_caller->flags |= CLIENT_FORCE_AOF;
2259 }
2260
2261 /* Log the command into the Slow log if needed, and populate the
2262 * per-command statistics that we show in INFO commandstats. */
2263 if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2264 char *latency_event = (c->cmd->flags & CMD_FAST) ?
2265 "fast-command" : "command";
2266 latencyAddSampleIfNeeded(latency_event,duration/1000);
2267 slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
2268 }
2269 if (flags & CMD_CALL_STATS) {
2270 c->lastcmd->microseconds += duration;
2271 c->lastcmd->calls++;
2272 }
2273
2274 /* Propagate the command into the AOF and replication link */
2275 if (flags & CMD_CALL_PROPAGATE &&
2276 (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2277 {
2278 int propagate_flags = PROPAGATE_NONE;
2279
2280 /* Check if the command operated changes in the data set. If so
2281 * set for replication / AOF propagation. */
2282 if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2283
2284 /* If the client forced AOF / replication of the command, set
2285 * the flags regardless of the command effects on the data set. */
2286 if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2287 if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2288
2289 /* However prevent AOF / replication propagation if the command
2290 * implementatino called preventCommandPropagation() or similar,
2291 * or if we don't have the call() flags to do so. */
2292 if (c->flags & CLIENT_PREVENT_REPL_PROP ||
2293 !(flags & CMD_CALL_PROPAGATE_REPL))
2294 propagate_flags &= ~PROPAGATE_REPL;
2295 if (c->flags & CLIENT_PREVENT_AOF_PROP ||
2296 !(flags & CMD_CALL_PROPAGATE_AOF))
2297 propagate_flags &= ~PROPAGATE_AOF;
2298
2299 /* Call propagate() only if at least one of AOF / replication
2300 * propagation is needed. */
2301 if (propagate_flags != PROPAGATE_NONE)
2302 propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2303 }
2304
2305 /* Restore the old replication flags, since call() can be executed
2306 * recursively. */
2307 c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2308 c->flags |= client_old_flags &
2309 (CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2310
2311 /* Handle the alsoPropagate() API to handle commands that want to propagate
2312 * multiple separated commands. Note that alsoPropagate() is not affected
2313 * by CLIENT_PREVENT_PROP flag. */
2314 if (server.also_propagate.numops) {
2315 int j;
2316 redisOp *rop;
2317
2318 if (flags & CMD_CALL_PROPAGATE) {
2319 for (j = 0; j < server.also_propagate.numops; j++) {
2320 rop = &server.also_propagate.ops[j];
2321 int target = rop->target;
2322 /* Whatever the command wish is, we honor the call() flags. */
2323 if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
2324 if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
2325 if (target)
2326 propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
2327 }
2328 }
2329 redisOpArrayFree(&server.also_propagate);
2330 }
2331 server.stat_numcommands++;
2332 }
2333
2334 /* If this function gets called we already read a whole
2335 * command, arguments are in the client argv/argc fields.
2336 * processCommand() execute the command or prepare the
2337 * server for a bulk read from the client.
2338 *
2339 * If C_OK is returned the client is still alive and valid and
2340 * other operations can be performed by the caller. Otherwise
2341 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
processCommand(client * c)2342 int processCommand(client *c) {
2343 /* The QUIT command is handled separately. Normal command procs will
2344 * go through checking for replication and QUIT will cause trouble
2345 * when FORCE_REPLICATION is enabled and would be implemented in
2346 * a regular command proc. */
2347 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2348 addReply(c,shared.ok);
2349 c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2350 return C_ERR;
2351 }
2352
2353 /* Now lookup the command and check ASAP about trivial error conditions
2354 * such as wrong arity, bad command name and so forth. */
2355 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2356 if (!c->cmd) {
2357 flagTransaction(c);
2358 addReplyErrorFormat(c,"unknown command '%s'",
2359 (char*)c->argv[0]->ptr);
2360 return C_OK;
2361 } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2362 (c->argc < -c->cmd->arity)) {
2363 flagTransaction(c);
2364 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2365 c->cmd->name);
2366 return C_OK;
2367 }
2368
2369 /* Check if the user is authenticated */
2370 if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
2371 {
2372 flagTransaction(c);
2373 addReply(c,shared.noautherr);
2374 return C_OK;
2375 }
2376
2377 /* If cluster is enabled perform the cluster redirection here.
2378 * However we don't perform the redirection if:
2379 * 1) The sender of this command is our master.
2380 * 2) The command has no key arguments. */
2381 if (server.cluster_enabled &&
2382 !(c->flags & CLIENT_MASTER) &&
2383 !(c->flags & CLIENT_LUA &&
2384 server.lua_caller->flags & CLIENT_MASTER) &&
2385 !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2386 c->cmd->proc != execCommand))
2387 {
2388 int hashslot;
2389 int error_code;
2390 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2391 &hashslot,&error_code);
2392 if (n == NULL || n != server.cluster->myself) {
2393 if (c->cmd->proc == execCommand) {
2394 discardTransaction(c);
2395 } else {
2396 flagTransaction(c);
2397 }
2398 clusterRedirectClient(c,n,hashslot,error_code);
2399 return C_OK;
2400 }
2401 }
2402
2403 /* Handle the maxmemory directive.
2404 *
2405 * First we try to free some memory if possible (if there are volatile
2406 * keys in the dataset). If there are not the only thing we can do
2407 * is returning an error. */
2408 if (server.maxmemory) {
2409 int retval = freeMemoryIfNeeded();
2410 /* freeMemoryIfNeeded may flush slave output buffers. This may result
2411 * into a slave, that may be the active client, to be freed. */
2412 if (server.current_client == NULL) return C_ERR;
2413
2414 /* It was impossible to free enough memory, and the command the client
2415 * is trying to execute is denied during OOM conditions? Error. */
2416 if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
2417 flagTransaction(c);
2418 addReply(c, shared.oomerr);
2419 return C_OK;
2420 }
2421 }
2422
2423 /* Don't accept write commands if there are problems persisting on disk
2424 * and if this is a master instance. */
2425 if (((server.stop_writes_on_bgsave_err &&
2426 server.saveparamslen > 0 &&
2427 server.lastbgsave_status == C_ERR) ||
2428 server.aof_last_write_status == C_ERR) &&
2429 server.masterhost == NULL &&
2430 (c->cmd->flags & CMD_WRITE ||
2431 c->cmd->proc == pingCommand))
2432 {
2433 flagTransaction(c);
2434 if (server.aof_last_write_status == C_OK)
2435 addReply(c, shared.bgsaveerr);
2436 else
2437 addReplySds(c,
2438 sdscatprintf(sdsempty(),
2439 "-MISCONF Errors writing to the AOF file: %s\r\n",
2440 strerror(server.aof_last_write_errno)));
2441 return C_OK;
2442 }
2443
2444 /* Don't accept write commands if there are not enough good slaves and
2445 * user configured the min-slaves-to-write option. */
2446 if (server.masterhost == NULL &&
2447 server.repl_min_slaves_to_write &&
2448 server.repl_min_slaves_max_lag &&
2449 c->cmd->flags & CMD_WRITE &&
2450 server.repl_good_slaves_count < server.repl_min_slaves_to_write)
2451 {
2452 flagTransaction(c);
2453 addReply(c, shared.noreplicaserr);
2454 return C_OK;
2455 }
2456
2457 /* Don't accept write commands if this is a read only slave. But
2458 * accept write commands if this is our master. */
2459 if (server.masterhost && server.repl_slave_ro &&
2460 !(c->flags & CLIENT_MASTER) &&
2461 c->cmd->flags & CMD_WRITE)
2462 {
2463 addReply(c, shared.roslaveerr);
2464 return C_OK;
2465 }
2466
2467 /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
2468 if (c->flags & CLIENT_PUBSUB &&
2469 c->cmd->proc != pingCommand &&
2470 c->cmd->proc != subscribeCommand &&
2471 c->cmd->proc != unsubscribeCommand &&
2472 c->cmd->proc != psubscribeCommand &&
2473 c->cmd->proc != punsubscribeCommand) {
2474 addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
2475 return C_OK;
2476 }
2477
2478 /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
2479 * we are a slave with a broken link with master. */
2480 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
2481 server.repl_serve_stale_data == 0 &&
2482 !(c->cmd->flags & CMD_STALE))
2483 {
2484 flagTransaction(c);
2485 addReply(c, shared.masterdownerr);
2486 return C_OK;
2487 }
2488
2489 /* Loading DB? Return an error if the command has not the
2490 * CMD_LOADING flag. */
2491 if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
2492 addReply(c, shared.loadingerr);
2493 return C_OK;
2494 }
2495
2496 /* Lua script too slow? Only allow a limited number of commands. */
2497 if (server.lua_timedout &&
2498 c->cmd->proc != authCommand &&
2499 c->cmd->proc != replconfCommand &&
2500 !(c->cmd->proc == shutdownCommand &&
2501 c->argc == 2 &&
2502 tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
2503 !(c->cmd->proc == scriptCommand &&
2504 c->argc == 2 &&
2505 tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
2506 {
2507 flagTransaction(c);
2508 addReply(c, shared.slowscripterr);
2509 return C_OK;
2510 }
2511
2512 /* Exec the command */
2513 if (c->flags & CLIENT_MULTI &&
2514 c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2515 c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2516 {
2517 queueMultiCommand(c);
2518 addReply(c,shared.queued);
2519 } else {
2520 call(c,CMD_CALL_FULL);
2521 c->woff = server.master_repl_offset;
2522 if (listLength(server.ready_keys))
2523 handleClientsBlockedOnLists();
2524 }
2525 return C_OK;
2526 }
2527
2528 /*================================== Shutdown =============================== */
2529
2530 /* Close listening sockets. Also unlink the unix domain socket if
2531 * unlink_unix_socket is non-zero. */
closeListeningSockets(int unlink_unix_socket)2532 void closeListeningSockets(int unlink_unix_socket) {
2533 int j;
2534
2535 for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
2536 if (server.sofd != -1) close(server.sofd);
2537 if (server.cluster_enabled)
2538 for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
2539 if (unlink_unix_socket && server.unixsocket) {
2540 serverLog(LL_NOTICE,"Removing the unix socket file.");
2541 unlink(server.unixsocket); /* don't care if this fails */
2542 }
2543 }
2544
prepareForShutdown(int flags)2545 int prepareForShutdown(int flags) {
2546 int save = flags & SHUTDOWN_SAVE;
2547 int nosave = flags & SHUTDOWN_NOSAVE;
2548
2549 serverLog(LL_WARNING,"User requested shutdown...");
2550
2551 /* Kill all the Lua debugger forked sessions. */
2552 ldbKillForkedSessions();
2553
2554 /* Kill the saving child if there is a background saving in progress.
2555 We want to avoid race conditions, for instance our saving child may
2556 overwrite the synchronous saving did by SHUTDOWN. */
2557 if (server.rdb_child_pid != -1) {
2558 serverLog(LL_WARNING,"There is a child saving an .rdb. Killing it!");
2559 kill(server.rdb_child_pid,SIGUSR1);
2560 rdbRemoveTempFile(server.rdb_child_pid);
2561 }
2562
2563 if (server.aof_state != AOF_OFF) {
2564 /* Kill the AOF saving child as the AOF we already have may be longer
2565 * but contains the full dataset anyway. */
2566 if (server.aof_child_pid != -1) {
2567 /* If we have AOF enabled but haven't written the AOF yet, don't
2568 * shutdown or else the dataset will be lost. */
2569 if (server.aof_state == AOF_WAIT_REWRITE) {
2570 serverLog(LL_WARNING, "Writing initial AOF, can't exit.");
2571 return C_ERR;
2572 }
2573 serverLog(LL_WARNING,
2574 "There is a child rewriting the AOF. Killing it!");
2575 kill(server.aof_child_pid,SIGUSR1);
2576 }
2577 /* Append only file: fsync() the AOF and exit */
2578 serverLog(LL_NOTICE,"Calling fsync() on the AOF file.");
2579 aof_fsync(server.aof_fd);
2580 }
2581
2582 /* Create a new RDB file before exiting. */
2583 if ((server.saveparamslen > 0 && !nosave) || save) {
2584 serverLog(LL_NOTICE,"Saving the final RDB snapshot before exiting.");
2585 /* Snapshotting. Perform a SYNC SAVE and exit */
2586 if (rdbSave(server.rdb_filename) != C_OK) {
2587 /* Ooops.. error saving! The best we can do is to continue
2588 * operating. Note that if there was a background saving process,
2589 * in the next cron() Redis will be notified that the background
2590 * saving aborted, handling special stuff like slaves pending for
2591 * synchronization... */
2592 serverLog(LL_WARNING,"Error trying to save the DB, can't exit.");
2593 return C_ERR;
2594 }
2595 }
2596
2597 /* Remove the pid file if possible and needed. */
2598 if (server.daemonize || server.pidfile) {
2599 serverLog(LL_NOTICE,"Removing the pid file.");
2600 unlink(server.pidfile);
2601 }
2602
2603 /* Best effort flush of slave output buffers, so that we hopefully
2604 * send them pending writes. */
2605 flushSlavesOutputBuffers();
2606
2607 /* Close the listening sockets. Apparently this allows faster restarts. */
2608 closeListeningSockets(1);
2609 serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
2610 server.sentinel_mode ? "Sentinel" : "Redis");
2611 return C_OK;
2612 }
2613
2614 /*================================== Commands =============================== */
2615
2616 /* Return zero if strings are the same, non-zero if they are not.
2617 * The comparison is performed in a way that prevents an attacker to obtain
2618 * information about the nature of the strings just monitoring the execution
2619 * time of the function.
2620 *
2621 * Note that limiting the comparison length to strings up to 512 bytes we
2622 * can avoid leaking any information about the password length and any
2623 * possible branch misprediction related leak.
2624 */
time_independent_strcmp(char * a,char * b)2625 int time_independent_strcmp(char *a, char *b) {
2626 char bufa[CONFIG_AUTHPASS_MAX_LEN], bufb[CONFIG_AUTHPASS_MAX_LEN];
2627 /* The above two strlen perform len(a) + len(b) operations where either
2628 * a or b are fixed (our password) length, and the difference is only
2629 * relative to the length of the user provided string, so no information
2630 * leak is possible in the following two lines of code. */
2631 unsigned int alen = strlen(a);
2632 unsigned int blen = strlen(b);
2633 unsigned int j;
2634 int diff = 0;
2635
2636 /* We can't compare strings longer than our static buffers.
2637 * Note that this will never pass the first test in practical circumstances
2638 * so there is no info leak. */
2639 if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
2640
2641 memset(bufa,0,sizeof(bufa)); /* Constant time. */
2642 memset(bufb,0,sizeof(bufb)); /* Constant time. */
2643 /* Again the time of the following two copies is proportional to
2644 * len(a) + len(b) so no info is leaked. */
2645 memcpy(bufa,a,alen);
2646 memcpy(bufb,b,blen);
2647
2648 /* Always compare all the chars in the two buffers without
2649 * conditional expressions. */
2650 for (j = 0; j < sizeof(bufa); j++) {
2651 diff |= (bufa[j] ^ bufb[j]);
2652 }
2653 /* Length must be equal as well. */
2654 diff |= alen ^ blen;
2655 return diff; /* If zero strings are the same. */
2656 }
2657
authCommand(client * c)2658 void authCommand(client *c) {
2659 if (!server.requirepass) {
2660 addReplyError(c,"Client sent AUTH, but no password is set");
2661 } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
2662 c->authenticated = 1;
2663 addReply(c,shared.ok);
2664 } else {
2665 c->authenticated = 0;
2666 addReplyError(c,"invalid password");
2667 }
2668 }
2669
2670 /* The PING command. It works in a different way if the client is in
2671 * in Pub/Sub mode. */
pingCommand(client * c)2672 void pingCommand(client *c) {
2673 /* The command takes zero or one arguments. */
2674 if (c->argc > 2) {
2675 addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
2676 c->cmd->name);
2677 return;
2678 }
2679
2680 if (c->flags & CLIENT_PUBSUB) {
2681 addReply(c,shared.mbulkhdr[2]);
2682 addReplyBulkCBuffer(c,"pong",4);
2683 if (c->argc == 1)
2684 addReplyBulkCBuffer(c,"",0);
2685 else
2686 addReplyBulk(c,c->argv[1]);
2687 } else {
2688 if (c->argc == 1)
2689 addReply(c,shared.pong);
2690 else
2691 addReplyBulk(c,c->argv[1]);
2692 }
2693 }
2694
echoCommand(client * c)2695 void echoCommand(client *c) {
2696 addReplyBulk(c,c->argv[1]);
2697 }
2698
timeCommand(client * c)2699 void timeCommand(client *c) {
2700 struct timeval tv;
2701
2702 /* gettimeofday() can only fail if &tv is a bad address so we
2703 * don't check for errors. */
2704 gettimeofday(&tv,NULL);
2705 addReplyMultiBulkLen(c,2);
2706 addReplyBulkLongLong(c,tv.tv_sec);
2707 addReplyBulkLongLong(c,tv.tv_usec);
2708 }
2709
2710 /* Helper function for addReplyCommand() to output flags. */
addReplyCommandFlag(client * c,struct redisCommand * cmd,int f,char * reply)2711 int addReplyCommandFlag(client *c, struct redisCommand *cmd, int f, char *reply) {
2712 if (cmd->flags & f) {
2713 addReplyStatus(c, reply);
2714 return 1;
2715 }
2716 return 0;
2717 }
2718
2719 /* Output the representation of a Redis command. Used by the COMMAND command. */
addReplyCommand(client * c,struct redisCommand * cmd)2720 void addReplyCommand(client *c, struct redisCommand *cmd) {
2721 if (!cmd) {
2722 addReply(c, shared.nullbulk);
2723 } else {
2724 /* We are adding: command name, arg count, flags, first, last, offset */
2725 addReplyMultiBulkLen(c, 6);
2726 addReplyBulkCString(c, cmd->name);
2727 addReplyLongLong(c, cmd->arity);
2728
2729 int flagcount = 0;
2730 void *flaglen = addDeferredMultiBulkLength(c);
2731 flagcount += addReplyCommandFlag(c,cmd,CMD_WRITE, "write");
2732 flagcount += addReplyCommandFlag(c,cmd,CMD_READONLY, "readonly");
2733 flagcount += addReplyCommandFlag(c,cmd,CMD_DENYOOM, "denyoom");
2734 flagcount += addReplyCommandFlag(c,cmd,CMD_ADMIN, "admin");
2735 flagcount += addReplyCommandFlag(c,cmd,CMD_PUBSUB, "pubsub");
2736 flagcount += addReplyCommandFlag(c,cmd,CMD_NOSCRIPT, "noscript");
2737 flagcount += addReplyCommandFlag(c,cmd,CMD_RANDOM, "random");
2738 flagcount += addReplyCommandFlag(c,cmd,CMD_SORT_FOR_SCRIPT,"sort_for_script");
2739 flagcount += addReplyCommandFlag(c,cmd,CMD_LOADING, "loading");
2740 flagcount += addReplyCommandFlag(c,cmd,CMD_STALE, "stale");
2741 flagcount += addReplyCommandFlag(c,cmd,CMD_SKIP_MONITOR, "skip_monitor");
2742 flagcount += addReplyCommandFlag(c,cmd,CMD_ASKING, "asking");
2743 flagcount += addReplyCommandFlag(c,cmd,CMD_FAST, "fast");
2744 if (cmd->getkeys_proc) {
2745 addReplyStatus(c, "movablekeys");
2746 flagcount += 1;
2747 }
2748 setDeferredMultiBulkLength(c, flaglen, flagcount);
2749
2750 addReplyLongLong(c, cmd->firstkey);
2751 addReplyLongLong(c, cmd->lastkey);
2752 addReplyLongLong(c, cmd->keystep);
2753 }
2754 }
2755
2756 /* COMMAND <subcommand> <args> */
commandCommand(client * c)2757 void commandCommand(client *c) {
2758 dictIterator *di;
2759 dictEntry *de;
2760
2761 if (c->argc == 1) {
2762 addReplyMultiBulkLen(c, dictSize(server.commands));
2763 di = dictGetIterator(server.commands);
2764 while ((de = dictNext(di)) != NULL) {
2765 addReplyCommand(c, dictGetVal(de));
2766 }
2767 dictReleaseIterator(di);
2768 } else if (!strcasecmp(c->argv[1]->ptr, "info")) {
2769 int i;
2770 addReplyMultiBulkLen(c, c->argc-2);
2771 for (i = 2; i < c->argc; i++) {
2772 addReplyCommand(c, dictFetchValue(server.commands, c->argv[i]->ptr));
2773 }
2774 } else if (!strcasecmp(c->argv[1]->ptr, "count") && c->argc == 2) {
2775 addReplyLongLong(c, dictSize(server.commands));
2776 } else if (!strcasecmp(c->argv[1]->ptr,"getkeys") && c->argc >= 3) {
2777 struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
2778 int *keys, numkeys, j;
2779
2780 if (!cmd) {
2781 addReplyErrorFormat(c,"Invalid command specified");
2782 return;
2783 } else if ((cmd->arity > 0 && cmd->arity != c->argc-2) ||
2784 ((c->argc-2) < -cmd->arity))
2785 {
2786 addReplyError(c,"Invalid number of arguments specified for command");
2787 return;
2788 }
2789
2790 keys = getKeysFromCommand(cmd,c->argv+2,c->argc-2,&numkeys);
2791 addReplyMultiBulkLen(c,numkeys);
2792 for (j = 0; j < numkeys; j++) addReplyBulk(c,c->argv[keys[j]+2]);
2793 getKeysFreeResult(keys);
2794 } else {
2795 addReplyError(c, "Unknown subcommand or wrong number of arguments.");
2796 return;
2797 }
2798 }
2799
2800 /* Convert an amount of bytes into a human readable string in the form
2801 * of 100B, 2G, 100M, 4K, and so forth. */
bytesToHuman(char * s,unsigned long long n)2802 void bytesToHuman(char *s, unsigned long long n) {
2803 double d;
2804
2805 if (n < 1024) {
2806 /* Bytes */
2807 sprintf(s,"%lluB",n);
2808 return;
2809 } else if (n < (1024*1024)) {
2810 d = (double)n/(1024);
2811 sprintf(s,"%.2fK",d);
2812 } else if (n < (1024LL*1024*1024)) {
2813 d = (double)n/(1024*1024);
2814 sprintf(s,"%.2fM",d);
2815 } else if (n < (1024LL*1024*1024*1024)) {
2816 d = (double)n/(1024LL*1024*1024);
2817 sprintf(s,"%.2fG",d);
2818 } else if (n < (1024LL*1024*1024*1024*1024)) {
2819 d = (double)n/(1024LL*1024*1024*1024);
2820 sprintf(s,"%.2fT",d);
2821 } else if (n < (1024LL*1024*1024*1024*1024*1024)) {
2822 d = (double)n/(1024LL*1024*1024*1024*1024);
2823 sprintf(s,"%.2fP",d);
2824 } else {
2825 /* Let's hope we never need this */
2826 sprintf(s,"%lluB",n);
2827 }
2828 }
2829
2830 /* Create the string returned by the INFO command. This is decoupled
2831 * by the INFO command itself as we need to report the same information
2832 * on memory corruption problems. */
genRedisInfoString(char * section)2833 sds genRedisInfoString(char *section) {
2834 sds info = sdsempty();
2835 time_t uptime = server.unixtime-server.stat_starttime;
2836 int j, numcommands;
2837 struct rusage self_ru, c_ru;
2838 unsigned long lol, bib;
2839 int allsections = 0, defsections = 0;
2840 int sections = 0;
2841
2842 if (section == NULL) section = "default";
2843 allsections = strcasecmp(section,"all") == 0;
2844 defsections = strcasecmp(section,"default") == 0;
2845
2846 getrusage(RUSAGE_SELF, &self_ru);
2847 getrusage(RUSAGE_CHILDREN, &c_ru);
2848 getClientsMaxBuffers(&lol,&bib);
2849
2850 /* Server */
2851 if (allsections || defsections || !strcasecmp(section,"server")) {
2852 static int call_uname = 1;
2853 static struct utsname name;
2854 char *mode;
2855
2856 if (server.cluster_enabled) mode = "cluster";
2857 else if (server.sentinel_mode) mode = "sentinel";
2858 else mode = "standalone";
2859
2860 if (sections++) info = sdscat(info,"\r\n");
2861
2862 if (call_uname) {
2863 /* Uname can be slow and is always the same output. Cache it. */
2864 uname(&name);
2865 call_uname = 0;
2866 }
2867
2868 info = sdscatprintf(info,
2869 "# Server\r\n"
2870 "redis_version:%s\r\n"
2871 "redis_git_sha1:%s\r\n"
2872 "redis_git_dirty:%d\r\n"
2873 "redis_build_id:%llx\r\n"
2874 "redis_mode:%s\r\n"
2875 "os:%s %s %s\r\n"
2876 "arch_bits:%d\r\n"
2877 "multiplexing_api:%s\r\n"
2878 "gcc_version:%d.%d.%d\r\n"
2879 "process_id:%ld\r\n"
2880 "run_id:%s\r\n"
2881 "tcp_port:%d\r\n"
2882 "uptime_in_seconds:%jd\r\n"
2883 "uptime_in_days:%jd\r\n"
2884 "hz:%d\r\n"
2885 "lru_clock:%ld\r\n"
2886 "executable:%s\r\n"
2887 "config_file:%s\r\n",
2888 REDIS_VERSION,
2889 redisGitSHA1(),
2890 strtol(redisGitDirty(),NULL,10) > 0,
2891 (unsigned long long) redisBuildId(),
2892 mode,
2893 name.sysname, name.release, name.machine,
2894 server.arch_bits,
2895 aeGetApiName(),
2896 #ifdef __GNUC__
2897 __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
2898 #else
2899 0,0,0,
2900 #endif
2901 (long) getpid(),
2902 server.runid,
2903 server.port,
2904 (intmax_t)uptime,
2905 (intmax_t)(uptime/(3600*24)),
2906 server.hz,
2907 (unsigned long) server.lruclock,
2908 server.executable ? server.executable : "",
2909 server.configfile ? server.configfile : "");
2910 }
2911
2912 /* Clients */
2913 if (allsections || defsections || !strcasecmp(section,"clients")) {
2914 if (sections++) info = sdscat(info,"\r\n");
2915 info = sdscatprintf(info,
2916 "# Clients\r\n"
2917 "connected_clients:%lu\r\n"
2918 "client_longest_output_list:%lu\r\n"
2919 "client_biggest_input_buf:%lu\r\n"
2920 "blocked_clients:%d\r\n",
2921 listLength(server.clients)-listLength(server.slaves),
2922 lol, bib,
2923 server.bpop_blocked_clients);
2924 }
2925
2926 /* Memory */
2927 if (allsections || defsections || !strcasecmp(section,"memory")) {
2928 char hmem[64];
2929 char peak_hmem[64];
2930 char total_system_hmem[64];
2931 char used_memory_lua_hmem[64];
2932 char used_memory_rss_hmem[64];
2933 char maxmemory_hmem[64];
2934 size_t zmalloc_used = zmalloc_used_memory();
2935 size_t total_system_mem = server.system_memory_size;
2936 const char *evict_policy = evictPolicyToString();
2937 long long memory_lua = (long long)lua_gc(server.lua,LUA_GCCOUNT,0)*1024;
2938
2939 /* Peak memory is updated from time to time by serverCron() so it
2940 * may happen that the instantaneous value is slightly bigger than
2941 * the peak value. This may confuse users, so we update the peak
2942 * if found smaller than the current memory usage. */
2943 if (zmalloc_used > server.stat_peak_memory)
2944 server.stat_peak_memory = zmalloc_used;
2945
2946 bytesToHuman(hmem,zmalloc_used);
2947 bytesToHuman(peak_hmem,server.stat_peak_memory);
2948 bytesToHuman(total_system_hmem,total_system_mem);
2949 bytesToHuman(used_memory_lua_hmem,memory_lua);
2950 bytesToHuman(used_memory_rss_hmem,server.resident_set_size);
2951 bytesToHuman(maxmemory_hmem,server.maxmemory);
2952
2953 if (sections++) info = sdscat(info,"\r\n");
2954 info = sdscatprintf(info,
2955 "# Memory\r\n"
2956 "used_memory:%zu\r\n"
2957 "used_memory_human:%s\r\n"
2958 "used_memory_rss:%zu\r\n"
2959 "used_memory_rss_human:%s\r\n"
2960 "used_memory_peak:%zu\r\n"
2961 "used_memory_peak_human:%s\r\n"
2962 "total_system_memory:%lu\r\n"
2963 "total_system_memory_human:%s\r\n"
2964 "used_memory_lua:%lld\r\n"
2965 "used_memory_lua_human:%s\r\n"
2966 "maxmemory:%lld\r\n"
2967 "maxmemory_human:%s\r\n"
2968 "maxmemory_policy:%s\r\n"
2969 "mem_fragmentation_ratio:%.2f\r\n"
2970 "mem_allocator:%s\r\n",
2971 zmalloc_used,
2972 hmem,
2973 server.resident_set_size,
2974 used_memory_rss_hmem,
2975 server.stat_peak_memory,
2976 peak_hmem,
2977 (unsigned long)total_system_mem,
2978 total_system_hmem,
2979 memory_lua,
2980 used_memory_lua_hmem,
2981 server.maxmemory,
2982 maxmemory_hmem,
2983 evict_policy,
2984 zmalloc_get_fragmentation_ratio(server.resident_set_size),
2985 ZMALLOC_LIB
2986 );
2987 }
2988
2989 /* Persistence */
2990 if (allsections || defsections || !strcasecmp(section,"persistence")) {
2991 if (sections++) info = sdscat(info,"\r\n");
2992 info = sdscatprintf(info,
2993 "# Persistence\r\n"
2994 "loading:%d\r\n"
2995 "rdb_changes_since_last_save:%lld\r\n"
2996 "rdb_bgsave_in_progress:%d\r\n"
2997 "rdb_last_save_time:%jd\r\n"
2998 "rdb_last_bgsave_status:%s\r\n"
2999 "rdb_last_bgsave_time_sec:%jd\r\n"
3000 "rdb_current_bgsave_time_sec:%jd\r\n"
3001 "aof_enabled:%d\r\n"
3002 "aof_rewrite_in_progress:%d\r\n"
3003 "aof_rewrite_scheduled:%d\r\n"
3004 "aof_last_rewrite_time_sec:%jd\r\n"
3005 "aof_current_rewrite_time_sec:%jd\r\n"
3006 "aof_last_bgrewrite_status:%s\r\n"
3007 "aof_last_write_status:%s\r\n",
3008 server.loading,
3009 server.dirty,
3010 server.rdb_child_pid != -1,
3011 (intmax_t)server.lastsave,
3012 (server.lastbgsave_status == C_OK) ? "ok" : "err",
3013 (intmax_t)server.rdb_save_time_last,
3014 (intmax_t)((server.rdb_child_pid == -1) ?
3015 -1 : time(NULL)-server.rdb_save_time_start),
3016 server.aof_state != AOF_OFF,
3017 server.aof_child_pid != -1,
3018 server.aof_rewrite_scheduled,
3019 (intmax_t)server.aof_rewrite_time_last,
3020 (intmax_t)((server.aof_child_pid == -1) ?
3021 -1 : time(NULL)-server.aof_rewrite_time_start),
3022 (server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
3023 (server.aof_last_write_status == C_OK) ? "ok" : "err");
3024
3025 if (server.aof_state != AOF_OFF) {
3026 info = sdscatprintf(info,
3027 "aof_current_size:%lld\r\n"
3028 "aof_base_size:%lld\r\n"
3029 "aof_pending_rewrite:%d\r\n"
3030 "aof_buffer_length:%zu\r\n"
3031 "aof_rewrite_buffer_length:%lu\r\n"
3032 "aof_pending_bio_fsync:%llu\r\n"
3033 "aof_delayed_fsync:%lu\r\n",
3034 (long long) server.aof_current_size,
3035 (long long) server.aof_rewrite_base_size,
3036 server.aof_rewrite_scheduled,
3037 sdslen(server.aof_buf),
3038 aofRewriteBufferSize(),
3039 bioPendingJobsOfType(BIO_AOF_FSYNC),
3040 server.aof_delayed_fsync);
3041 }
3042
3043 if (server.loading) {
3044 double perc;
3045 time_t eta, elapsed;
3046 off_t remaining_bytes = server.loading_total_bytes-
3047 server.loading_loaded_bytes;
3048
3049 perc = ((double)server.loading_loaded_bytes /
3050 (server.loading_total_bytes+1)) * 100;
3051
3052 elapsed = time(NULL)-server.loading_start_time;
3053 if (elapsed == 0) {
3054 eta = 1; /* A fake 1 second figure if we don't have
3055 enough info */
3056 } else {
3057 eta = (elapsed*remaining_bytes)/(server.loading_loaded_bytes+1);
3058 }
3059
3060 info = sdscatprintf(info,
3061 "loading_start_time:%jd\r\n"
3062 "loading_total_bytes:%llu\r\n"
3063 "loading_loaded_bytes:%llu\r\n"
3064 "loading_loaded_perc:%.2f\r\n"
3065 "loading_eta_seconds:%jd\r\n",
3066 (intmax_t) server.loading_start_time,
3067 (unsigned long long) server.loading_total_bytes,
3068 (unsigned long long) server.loading_loaded_bytes,
3069 perc,
3070 (intmax_t)eta
3071 );
3072 }
3073 }
3074
3075 /* Stats */
3076 if (allsections || defsections || !strcasecmp(section,"stats")) {
3077 if (sections++) info = sdscat(info,"\r\n");
3078 info = sdscatprintf(info,
3079 "# Stats\r\n"
3080 "total_connections_received:%lld\r\n"
3081 "total_commands_processed:%lld\r\n"
3082 "instantaneous_ops_per_sec:%lld\r\n"
3083 "total_net_input_bytes:%lld\r\n"
3084 "total_net_output_bytes:%lld\r\n"
3085 "instantaneous_input_kbps:%.2f\r\n"
3086 "instantaneous_output_kbps:%.2f\r\n"
3087 "rejected_connections:%lld\r\n"
3088 "sync_full:%lld\r\n"
3089 "sync_partial_ok:%lld\r\n"
3090 "sync_partial_err:%lld\r\n"
3091 "expired_keys:%lld\r\n"
3092 "evicted_keys:%lld\r\n"
3093 "keyspace_hits:%lld\r\n"
3094 "keyspace_misses:%lld\r\n"
3095 "pubsub_channels:%ld\r\n"
3096 "pubsub_patterns:%lu\r\n"
3097 "latest_fork_usec:%lld\r\n"
3098 "migrate_cached_sockets:%ld\r\n",
3099 server.stat_numconnections,
3100 server.stat_numcommands,
3101 getInstantaneousMetric(STATS_METRIC_COMMAND),
3102 server.stat_net_input_bytes,
3103 server.stat_net_output_bytes,
3104 (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
3105 (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
3106 server.stat_rejected_conn,
3107 server.stat_sync_full,
3108 server.stat_sync_partial_ok,
3109 server.stat_sync_partial_err,
3110 server.stat_expiredkeys,
3111 server.stat_evictedkeys,
3112 server.stat_keyspace_hits,
3113 server.stat_keyspace_misses,
3114 dictSize(server.pubsub_channels),
3115 listLength(server.pubsub_patterns),
3116 server.stat_fork_time,
3117 dictSize(server.migrate_cached_sockets));
3118 }
3119
3120 /* Replication */
3121 if (allsections || defsections || !strcasecmp(section,"replication")) {
3122 if (sections++) info = sdscat(info,"\r\n");
3123 info = sdscatprintf(info,
3124 "# Replication\r\n"
3125 "role:%s\r\n",
3126 server.masterhost == NULL ? "master" : "slave");
3127 if (server.masterhost) {
3128 long long slave_repl_offset = 1;
3129
3130 if (server.master)
3131 slave_repl_offset = server.master->reploff;
3132 else if (server.cached_master)
3133 slave_repl_offset = server.cached_master->reploff;
3134
3135 info = sdscatprintf(info,
3136 "master_host:%s\r\n"
3137 "master_port:%d\r\n"
3138 "master_link_status:%s\r\n"
3139 "master_last_io_seconds_ago:%d\r\n"
3140 "master_sync_in_progress:%d\r\n"
3141 "slave_repl_offset:%lld\r\n"
3142 ,server.masterhost,
3143 server.masterport,
3144 (server.repl_state == REPL_STATE_CONNECTED) ?
3145 "up" : "down",
3146 server.master ?
3147 ((int)(server.unixtime-server.master->lastinteraction)) : -1,
3148 server.repl_state == REPL_STATE_TRANSFER,
3149 slave_repl_offset
3150 );
3151
3152 if (server.repl_state == REPL_STATE_TRANSFER) {
3153 info = sdscatprintf(info,
3154 "master_sync_left_bytes:%lld\r\n"
3155 "master_sync_last_io_seconds_ago:%d\r\n"
3156 , (long long)
3157 (server.repl_transfer_size - server.repl_transfer_read),
3158 (int)(server.unixtime-server.repl_transfer_lastio)
3159 );
3160 }
3161
3162 if (server.repl_state != REPL_STATE_CONNECTED) {
3163 info = sdscatprintf(info,
3164 "master_link_down_since_seconds:%jd\r\n",
3165 (intmax_t)server.unixtime-server.repl_down_since);
3166 }
3167 info = sdscatprintf(info,
3168 "slave_priority:%d\r\n"
3169 "slave_read_only:%d\r\n",
3170 server.slave_priority,
3171 server.repl_slave_ro);
3172 }
3173
3174 info = sdscatprintf(info,
3175 "connected_slaves:%lu\r\n",
3176 listLength(server.slaves));
3177
3178 /* If min-slaves-to-write is active, write the number of slaves
3179 * currently considered 'good'. */
3180 if (server.repl_min_slaves_to_write &&
3181 server.repl_min_slaves_max_lag) {
3182 info = sdscatprintf(info,
3183 "min_slaves_good_slaves:%d\r\n",
3184 server.repl_good_slaves_count);
3185 }
3186
3187 if (listLength(server.slaves)) {
3188 int slaveid = 0;
3189 listNode *ln;
3190 listIter li;
3191
3192 listRewind(server.slaves,&li);
3193 while((ln = listNext(&li))) {
3194 client *slave = listNodeValue(ln);
3195 char *state = NULL;
3196 char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
3197 int port;
3198 long lag = 0;
3199
3200 if (slaveip[0] == '\0') {
3201 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) == -1)
3202 continue;
3203 slaveip = ip;
3204 }
3205 switch(slave->replstate) {
3206 case SLAVE_STATE_WAIT_BGSAVE_START:
3207 case SLAVE_STATE_WAIT_BGSAVE_END:
3208 state = "wait_bgsave";
3209 break;
3210 case SLAVE_STATE_SEND_BULK:
3211 state = "send_bulk";
3212 break;
3213 case SLAVE_STATE_ONLINE:
3214 state = "online";
3215 break;
3216 }
3217 if (state == NULL) continue;
3218 if (slave->replstate == SLAVE_STATE_ONLINE)
3219 lag = time(NULL) - slave->repl_ack_time;
3220
3221 info = sdscatprintf(info,
3222 "slave%d:ip=%s,port=%d,state=%s,"
3223 "offset=%lld,lag=%ld\r\n",
3224 slaveid,slaveip,slave->slave_listening_port,state,
3225 slave->repl_ack_off, lag);
3226 slaveid++;
3227 }
3228 }
3229 info = sdscatprintf(info,
3230 "master_repl_offset:%lld\r\n"
3231 "repl_backlog_active:%d\r\n"
3232 "repl_backlog_size:%lld\r\n"
3233 "repl_backlog_first_byte_offset:%lld\r\n"
3234 "repl_backlog_histlen:%lld\r\n",
3235 server.master_repl_offset,
3236 server.repl_backlog != NULL,
3237 server.repl_backlog_size,
3238 server.repl_backlog_off,
3239 server.repl_backlog_histlen);
3240 }
3241
3242 /* CPU */
3243 if (allsections || defsections || !strcasecmp(section,"cpu")) {
3244 if (sections++) info = sdscat(info,"\r\n");
3245 info = sdscatprintf(info,
3246 "# CPU\r\n"
3247 "used_cpu_sys:%.2f\r\n"
3248 "used_cpu_user:%.2f\r\n"
3249 "used_cpu_sys_children:%.2f\r\n"
3250 "used_cpu_user_children:%.2f\r\n",
3251 (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
3252 (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
3253 (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
3254 (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000);
3255 }
3256
3257 /* cmdtime */
3258 if (allsections || !strcasecmp(section,"commandstats")) {
3259 if (sections++) info = sdscat(info,"\r\n");
3260 info = sdscatprintf(info, "# Commandstats\r\n");
3261 numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
3262 for (j = 0; j < numcommands; j++) {
3263 struct redisCommand *c = redisCommandTable+j;
3264
3265 if (!c->calls) continue;
3266 info = sdscatprintf(info,
3267 "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f\r\n",
3268 c->name, c->calls, c->microseconds,
3269 (c->calls == 0) ? 0 : ((float)c->microseconds/c->calls));
3270 }
3271 }
3272
3273 /* Cluster */
3274 if (allsections || defsections || !strcasecmp(section,"cluster")) {
3275 if (sections++) info = sdscat(info,"\r\n");
3276 info = sdscatprintf(info,
3277 "# Cluster\r\n"
3278 "cluster_enabled:%d\r\n",
3279 server.cluster_enabled);
3280 }
3281
3282 /* Key space */
3283 if (allsections || defsections || !strcasecmp(section,"keyspace")) {
3284 if (sections++) info = sdscat(info,"\r\n");
3285 info = sdscatprintf(info, "# Keyspace\r\n");
3286 for (j = 0; j < server.dbnum; j++) {
3287 long long keys, vkeys;
3288
3289 keys = dictSize(server.db[j].dict);
3290 vkeys = dictSize(server.db[j].expires);
3291 if (keys || vkeys) {
3292 info = sdscatprintf(info,
3293 "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
3294 j, keys, vkeys, server.db[j].avg_ttl);
3295 }
3296 }
3297 }
3298 return info;
3299 }
3300
infoCommand(client * c)3301 void infoCommand(client *c) {
3302 char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
3303
3304 if (c->argc > 2) {
3305 addReply(c,shared.syntaxerr);
3306 return;
3307 }
3308 addReplyBulkSds(c, genRedisInfoString(section));
3309 }
3310
monitorCommand(client * c)3311 void monitorCommand(client *c) {
3312 /* ignore MONITOR if already slave or in monitor mode */
3313 if (c->flags & CLIENT_SLAVE) return;
3314
3315 c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
3316 listAddNodeTail(server.monitors,c);
3317 addReply(c,shared.ok);
3318 }
3319
3320 /* ============================ Maxmemory directive ======================== */
3321
3322 /* freeMemoryIfNeeded() gets called when 'maxmemory' is set on the config
3323 * file to limit the max memory used by the server, before processing a
3324 * command.
3325 *
3326 * The goal of the function is to free enough memory to keep Redis under the
3327 * configured memory limit.
3328 *
3329 * The function starts calculating how many bytes should be freed to keep
3330 * Redis under the limit, and enters a loop selecting the best keys to
3331 * evict accordingly to the configured policy.
3332 *
3333 * If all the bytes needed to return back under the limit were freed the
3334 * function returns C_OK, otherwise C_ERR is returned, and the caller
3335 * should block the execution of commands that will result in more memory
3336 * used by the server.
3337 *
3338 * ------------------------------------------------------------------------
3339 *
3340 * LRU approximation algorithm
3341 *
3342 * Redis uses an approximation of the LRU algorithm that runs in constant
3343 * memory. Every time there is a key to expire, we sample N keys (with
3344 * N very small, usually in around 5) to populate a pool of best keys to
3345 * evict of M keys (the pool size is defined by MAXMEMORY_EVICTION_POOL_SIZE).
3346 *
3347 * The N keys sampled are added in the pool of good keys to expire (the one
3348 * with an old access time) if they are better than one of the current keys
3349 * in the pool.
3350 *
3351 * After the pool is populated, the best key we have in the pool is expired.
3352 * However note that we don't remove keys from the pool when they are deleted
3353 * so the pool may contain keys that no longer exist.
3354 *
3355 * When we try to evict a key, and all the entries in the pool don't exist
3356 * we populate it again. This time we'll be sure that the pool has at least
3357 * one key that can be evicted, if there is at least one key that can be
3358 * evicted in the whole database. */
3359
3360 /* Create a new eviction pool. */
evictionPoolAlloc(void)3361 struct evictionPoolEntry *evictionPoolAlloc(void) {
3362 struct evictionPoolEntry *ep;
3363 int j;
3364
3365 ep = zmalloc(sizeof(*ep)*MAXMEMORY_EVICTION_POOL_SIZE);
3366 for (j = 0; j < MAXMEMORY_EVICTION_POOL_SIZE; j++) {
3367 ep[j].idle = 0;
3368 ep[j].key = NULL;
3369 }
3370 return ep;
3371 }
3372
3373 /* This is an helper function for freeMemoryIfNeeded(), it is used in order
3374 * to populate the evictionPool with a few entries every time we want to
3375 * expire a key. Keys with idle time smaller than one of the current
3376 * keys are added. Keys are always added if there are free entries.
3377 *
3378 * We insert keys on place in ascending order, so keys with the smaller
3379 * idle time are on the left, and keys with the higher idle time on the
3380 * right. */
3381
3382 #define EVICTION_SAMPLES_ARRAY_SIZE 16
evictionPoolPopulate(dict * sampledict,dict * keydict,struct evictionPoolEntry * pool)3383 void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
3384 int j, k, count;
3385 dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
3386 dictEntry **samples;
3387
3388 /* Try to use a static buffer: this function is a big hit...
3389 * Note: it was actually measured that this helps. */
3390 if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
3391 samples = _samples;
3392 } else {
3393 samples = zmalloc(sizeof(samples[0])*server.maxmemory_samples);
3394 }
3395
3396 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
3397 for (j = 0; j < count; j++) {
3398 unsigned long long idle;
3399 sds key;
3400 robj *o;
3401 dictEntry *de;
3402
3403 de = samples[j];
3404 key = dictGetKey(de);
3405 /* If the dictionary we are sampling from is not the main
3406 * dictionary (but the expires one) we need to lookup the key
3407 * again in the key dictionary to obtain the value object. */
3408 if (sampledict != keydict) de = dictFind(keydict, key);
3409 o = dictGetVal(de);
3410 idle = estimateObjectIdleTime(o);
3411
3412 /* Insert the element inside the pool.
3413 * First, find the first empty bucket or the first populated
3414 * bucket that has an idle time smaller than our idle time. */
3415 k = 0;
3416 while (k < MAXMEMORY_EVICTION_POOL_SIZE &&
3417 pool[k].key &&
3418 pool[k].idle < idle) k++;
3419 if (k == 0 && pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key != NULL) {
3420 /* Can't insert if the element is < the worst element we have
3421 * and there are no empty buckets. */
3422 continue;
3423 } else if (k < MAXMEMORY_EVICTION_POOL_SIZE && pool[k].key == NULL) {
3424 /* Inserting into empty position. No setup needed before insert. */
3425 } else {
3426 /* Inserting in the middle. Now k points to the first element
3427 * greater than the element to insert. */
3428 if (pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key == NULL) {
3429 /* Free space on the right? Insert at k shifting
3430 * all the elements from k to end to the right. */
3431 memmove(pool+k+1,pool+k,
3432 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3433 } else {
3434 /* No free space on right? Insert at k-1 */
3435 k--;
3436 /* Shift all elements on the left of k (included) to the
3437 * left, so we discard the element with smaller idle time. */
3438 sdsfree(pool[0].key);
3439 memmove(pool,pool+1,sizeof(pool[0])*k);
3440 }
3441 }
3442 pool[k].key = sdsdup(key);
3443 pool[k].idle = idle;
3444 }
3445 if (samples != _samples) zfree(samples);
3446 }
3447
freeMemoryIfNeeded(void)3448 int freeMemoryIfNeeded(void) {
3449 size_t mem_used, mem_tofree, mem_freed;
3450 int slaves = listLength(server.slaves);
3451 mstime_t latency, eviction_latency;
3452
3453 /* Remove the size of slaves output buffers and AOF buffer from the
3454 * count of used memory. */
3455 mem_used = zmalloc_used_memory();
3456 if (slaves) {
3457 listIter li;
3458 listNode *ln;
3459
3460 listRewind(server.slaves,&li);
3461 while((ln = listNext(&li))) {
3462 client *slave = listNodeValue(ln);
3463 unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
3464 if (obuf_bytes > mem_used)
3465 mem_used = 0;
3466 else
3467 mem_used -= obuf_bytes;
3468 }
3469 }
3470 if (server.aof_state != AOF_OFF) {
3471 mem_used -= sdslen(server.aof_buf);
3472 mem_used -= aofRewriteBufferSize();
3473 }
3474
3475 /* Check if we are over the memory limit. */
3476 if (mem_used <= server.maxmemory) return C_OK;
3477
3478 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
3479 return C_ERR; /* We need to free memory, but policy forbids. */
3480
3481 /* Compute how much memory we need to free. */
3482 mem_tofree = mem_used - server.maxmemory;
3483 mem_freed = 0;
3484 latencyStartMonitor(latency);
3485 while (mem_freed < mem_tofree) {
3486 int j, k, keys_freed = 0;
3487
3488 for (j = 0; j < server.dbnum; j++) {
3489 long bestval = 0; /* just to prevent warning */
3490 sds bestkey = NULL;
3491 dictEntry *de;
3492 redisDb *db = server.db+j;
3493 dict *dict;
3494
3495 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3496 server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
3497 {
3498 dict = server.db[j].dict;
3499 } else {
3500 dict = server.db[j].expires;
3501 }
3502 if (dictSize(dict) == 0) continue;
3503
3504 /* volatile-random and allkeys-random policy */
3505 if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
3506 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
3507 {
3508 de = dictGetRandomKey(dict);
3509 bestkey = dictGetKey(de);
3510 }
3511
3512 /* volatile-lru and allkeys-lru policy */
3513 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
3514 server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
3515 {
3516 struct evictionPoolEntry *pool = db->eviction_pool;
3517
3518 while(bestkey == NULL) {
3519 evictionPoolPopulate(dict, db->dict, db->eviction_pool);
3520 /* Go backward from best to worst element to evict. */
3521 for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
3522 if (pool[k].key == NULL) continue;
3523 de = dictFind(dict,pool[k].key);
3524
3525 /* Remove the entry from the pool. */
3526 sdsfree(pool[k].key);
3527 /* Shift all elements on its right to left. */
3528 memmove(pool+k,pool+k+1,
3529 sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
3530 /* Clear the element on the right which is empty
3531 * since we shifted one position to the left. */
3532 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
3533 pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
3534
3535 /* If the key exists, is our pick. Otherwise it is
3536 * a ghost and we need to try the next element. */
3537 if (de) {
3538 bestkey = dictGetKey(de);
3539 break;
3540 } else {
3541 /* Ghost... */
3542 continue;
3543 }
3544 }
3545 }
3546 }
3547
3548 /* volatile-ttl */
3549 else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
3550 for (k = 0; k < server.maxmemory_samples; k++) {
3551 sds thiskey;
3552 long thisval;
3553
3554 de = dictGetRandomKey(dict);
3555 thiskey = dictGetKey(de);
3556 thisval = (long) dictGetVal(de);
3557
3558 /* Expire sooner (minor expire unix timestamp) is better
3559 * candidate for deletion */
3560 if (bestkey == NULL || thisval < bestval) {
3561 bestkey = thiskey;
3562 bestval = thisval;
3563 }
3564 }
3565 }
3566
3567 /* Finally remove the selected key. */
3568 if (bestkey) {
3569 long long delta;
3570
3571 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
3572 propagateExpire(db,keyobj);
3573 /* We compute the amount of memory freed by dbDelete() alone.
3574 * It is possible that actually the memory needed to propagate
3575 * the DEL in AOF and replication link is greater than the one
3576 * we are freeing removing the key, but we can't account for
3577 * that otherwise we would never exit the loop.
3578 *
3579 * AOF and Output buffer memory will be freed eventually so
3580 * we only care about memory used by the key space. */
3581 delta = (long long) zmalloc_used_memory();
3582 latencyStartMonitor(eviction_latency);
3583 dbDelete(db,keyobj);
3584 latencyEndMonitor(eviction_latency);
3585 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
3586 latencyRemoveNestedEvent(latency,eviction_latency);
3587 delta -= (long long) zmalloc_used_memory();
3588 mem_freed += delta;
3589 server.stat_evictedkeys++;
3590 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
3591 keyobj, db->id);
3592 decrRefCount(keyobj);
3593 keys_freed++;
3594
3595 /* When the memory to free starts to be big enough, we may
3596 * start spending so much time here that is impossible to
3597 * deliver data to the slaves fast enough, so we force the
3598 * transmission here inside the loop. */
3599 if (slaves) flushSlavesOutputBuffers();
3600 }
3601 }
3602 if (!keys_freed) {
3603 latencyEndMonitor(latency);
3604 latencyAddSampleIfNeeded("eviction-cycle",latency);
3605 return C_ERR; /* nothing to free... */
3606 }
3607 }
3608 latencyEndMonitor(latency);
3609 latencyAddSampleIfNeeded("eviction-cycle",latency);
3610 return C_OK;
3611 }
3612
3613 /* =================================== Main! ================================ */
3614
3615 #ifdef __linux__
linuxOvercommitMemoryValue(void)3616 int linuxOvercommitMemoryValue(void) {
3617 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
3618 char buf[64];
3619
3620 if (!fp) return -1;
3621 if (fgets(buf,64,fp) == NULL) {
3622 fclose(fp);
3623 return -1;
3624 }
3625 fclose(fp);
3626
3627 return atoi(buf);
3628 }
3629
linuxMemoryWarnings(void)3630 void linuxMemoryWarnings(void) {
3631 if (linuxOvercommitMemoryValue() == 0) {
3632 serverLog(LL_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
3633 }
3634 if (THPIsEnabled()) {
3635 serverLog(LL_WARNING,"WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.");
3636 }
3637 }
3638 #endif /* __linux__ */
3639
createPidFile(void)3640 void createPidFile(void) {
3641 /* If pidfile requested, but no pidfile defined, use
3642 * default pidfile path */
3643 if (!server.pidfile) server.pidfile = zstrdup(CONFIG_DEFAULT_PID_FILE);
3644
3645 /* Try to write the pid file in a best-effort way. */
3646 FILE *fp = fopen(server.pidfile,"w");
3647 if (fp) {
3648 fprintf(fp,"%d\n",(int)getpid());
3649 fclose(fp);
3650 }
3651 }
3652
daemonize(void)3653 void daemonize(void) {
3654 int fd;
3655
3656 if (fork() != 0) exit(0); /* parent exits */
3657 setsid(); /* create a new session */
3658
3659 /* Every output goes to /dev/null. If Redis is daemonized but
3660 * the 'logfile' is set to 'stdout' in the configuration file
3661 * it will not log at all. */
3662 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3663 dup2(fd, STDIN_FILENO);
3664 dup2(fd, STDOUT_FILENO);
3665 dup2(fd, STDERR_FILENO);
3666 if (fd > STDERR_FILENO) close(fd);
3667 }
3668 }
3669
version(void)3670 void version(void) {
3671 printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n",
3672 REDIS_VERSION,
3673 redisGitSHA1(),
3674 atoi(redisGitDirty()) > 0,
3675 ZMALLOC_LIB,
3676 sizeof(long) == 4 ? 32 : 64,
3677 (unsigned long long) redisBuildId());
3678 exit(0);
3679 }
3680
usage(void)3681 void usage(void) {
3682 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
3683 fprintf(stderr," ./redis-server - (read config from stdin)\n");
3684 fprintf(stderr," ./redis-server -v or --version\n");
3685 fprintf(stderr," ./redis-server -h or --help\n");
3686 fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n");
3687 fprintf(stderr,"Examples:\n");
3688 fprintf(stderr," ./redis-server (run the server with default conf)\n");
3689 fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
3690 fprintf(stderr," ./redis-server --port 7777\n");
3691 fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
3692 fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
3693 fprintf(stderr,"Sentinel mode:\n");
3694 fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n");
3695 exit(1);
3696 }
3697
redisAsciiArt(void)3698 void redisAsciiArt(void) {
3699 #include "asciilogo.h"
3700 char *buf = zmalloc(1024*16);
3701 char *mode;
3702
3703 if (server.cluster_enabled) mode = "cluster";
3704 else if (server.sentinel_mode) mode = "sentinel";
3705 else mode = "standalone";
3706
3707 if (server.syslog_enabled) {
3708 serverLog(LL_NOTICE,
3709 "Redis %s (%s/%d) %s bit, %s mode, port %d, pid %ld ready to start.",
3710 REDIS_VERSION,
3711 redisGitSHA1(),
3712 strtol(redisGitDirty(),NULL,10) > 0,
3713 (sizeof(long) == 8) ? "64" : "32",
3714 mode, server.port,
3715 (long) getpid()
3716 );
3717 } else {
3718 snprintf(buf,1024*16,ascii_logo,
3719 REDIS_VERSION,
3720 redisGitSHA1(),
3721 strtol(redisGitDirty(),NULL,10) > 0,
3722 (sizeof(long) == 8) ? "64" : "32",
3723 mode, server.port,
3724 (long) getpid()
3725 );
3726 serverLogRaw(LL_NOTICE|LL_RAW,buf);
3727 }
3728 zfree(buf);
3729 }
3730
sigShutdownHandler(int sig)3731 static void sigShutdownHandler(int sig) {
3732 char *msg;
3733
3734 switch (sig) {
3735 case SIGINT:
3736 msg = "Received SIGINT scheduling shutdown...";
3737 break;
3738 case SIGTERM:
3739 msg = "Received SIGTERM scheduling shutdown...";
3740 break;
3741 default:
3742 msg = "Received shutdown signal, scheduling shutdown...";
3743 };
3744
3745 /* SIGINT is often delivered via Ctrl+C in an interactive session.
3746 * If we receive the signal the second time, we interpret this as
3747 * the user really wanting to quit ASAP without waiting to persist
3748 * on disk. */
3749 if (server.shutdown_asap && sig == SIGINT) {
3750 serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
3751 rdbRemoveTempFile(getpid());
3752 exit(1); /* Exit with an error since this was not a clean shutdown. */
3753 } else if (server.loading) {
3754 exit(0);
3755 }
3756
3757 serverLogFromHandler(LL_WARNING, msg);
3758 server.shutdown_asap = 1;
3759 }
3760
setupSignalHandlers(void)3761 void setupSignalHandlers(void) {
3762 struct sigaction act;
3763
3764 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
3765 * Otherwise, sa_handler is used. */
3766 sigemptyset(&act.sa_mask);
3767 act.sa_flags = 0;
3768 act.sa_handler = sigShutdownHandler;
3769 sigaction(SIGTERM, &act, NULL);
3770 sigaction(SIGINT, &act, NULL);
3771
3772 #ifdef HAVE_BACKTRACE
3773 sigemptyset(&act.sa_mask);
3774 act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
3775 act.sa_sigaction = sigsegvHandler;
3776 sigaction(SIGSEGV, &act, NULL);
3777 sigaction(SIGBUS, &act, NULL);
3778 sigaction(SIGFPE, &act, NULL);
3779 sigaction(SIGILL, &act, NULL);
3780 #endif
3781 return;
3782 }
3783
3784 void memtest(size_t megabytes, int passes);
3785
3786 /* Returns 1 if there is --sentinel among the arguments or if
3787 * argv[0] is exactly "redis-sentinel". */
checkForSentinelMode(int argc,char ** argv)3788 int checkForSentinelMode(int argc, char **argv) {
3789 int j;
3790
3791 if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
3792 for (j = 1; j < argc; j++)
3793 if (!strcmp(argv[j],"--sentinel")) return 1;
3794 return 0;
3795 }
3796
3797 /* Function called at startup to load RDB or AOF file in memory. */
loadDataFromDisk(void)3798 void loadDataFromDisk(void) {
3799 long long start = ustime();
3800 if (server.aof_state == AOF_ON) {
3801 if (loadAppendOnlyFile(server.aof_filename) == C_OK)
3802 serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
3803 } else {
3804 if (rdbLoad(server.rdb_filename) == C_OK) {
3805 serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
3806 (float)(ustime()-start)/1000000);
3807 } else if (errno != ENOENT) {
3808 serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
3809 exit(1);
3810 }
3811 }
3812 }
3813
redisOutOfMemoryHandler(size_t allocation_size)3814 void redisOutOfMemoryHandler(size_t allocation_size) {
3815 serverLog(LL_WARNING,"Out Of Memory allocating %zu bytes!",
3816 allocation_size);
3817 serverPanic("Redis aborting for OUT OF MEMORY");
3818 }
3819
redisSetProcTitle(char * title)3820 void redisSetProcTitle(char *title) {
3821 #ifdef USE_SETPROCTITLE
3822 char *server_mode = "";
3823 if (server.cluster_enabled) server_mode = " [cluster]";
3824 else if (server.sentinel_mode) server_mode = " [sentinel]";
3825
3826 setproctitle("%s %s:%d%s",
3827 title,
3828 server.bindaddr_count ? server.bindaddr[0] : "*",
3829 server.port,
3830 server_mode);
3831 #else
3832 UNUSED(title);
3833 #endif
3834 }
3835
3836 /*
3837 * Check whether systemd or upstart have been used to start redis.
3838 */
3839
redisSupervisedUpstart(void)3840 int redisSupervisedUpstart(void) {
3841 const char *upstart_job = getenv("UPSTART_JOB");
3842
3843 if (!upstart_job) {
3844 serverLog(LL_WARNING,
3845 "upstart supervision requested, but UPSTART_JOB not found");
3846 return 0;
3847 }
3848
3849 serverLog(LL_NOTICE, "supervised by upstart, will stop to signal readiness");
3850 raise(SIGSTOP);
3851 unsetenv("UPSTART_JOB");
3852 return 1;
3853 }
3854
redisSupervisedSystemd(void)3855 int redisSupervisedSystemd(void) {
3856 const char *notify_socket = getenv("NOTIFY_SOCKET");
3857 int fd = 1;
3858 struct sockaddr_un su;
3859 struct iovec iov;
3860 struct msghdr hdr;
3861 int sendto_flags = 0;
3862
3863 if (!notify_socket) {
3864 serverLog(LL_WARNING,
3865 "systemd supervision requested, but NOTIFY_SOCKET not found");
3866 return 0;
3867 }
3868
3869 if ((strchr("@/", notify_socket[0])) == NULL || strlen(notify_socket) < 2) {
3870 return 0;
3871 }
3872
3873 serverLog(LL_NOTICE, "supervised by systemd, will signal readiness");
3874 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
3875 serverLog(LL_WARNING,
3876 "Can't connect to systemd socket %s", notify_socket);
3877 return 0;
3878 }
3879
3880 memset(&su, 0, sizeof(su));
3881 su.sun_family = AF_UNIX;
3882 strncpy (su.sun_path, notify_socket, sizeof(su.sun_path) -1);
3883 su.sun_path[sizeof(su.sun_path) - 1] = '\0';
3884
3885 if (notify_socket[0] == '@')
3886 su.sun_path[0] = '\0';
3887
3888 memset(&iov, 0, sizeof(iov));
3889 iov.iov_base = "READY=1";
3890 iov.iov_len = strlen("READY=1");
3891
3892 memset(&hdr, 0, sizeof(hdr));
3893 hdr.msg_name = &su;
3894 hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
3895 strlen(notify_socket);
3896 hdr.msg_iov = &iov;
3897 hdr.msg_iovlen = 1;
3898
3899 unsetenv("NOTIFY_SOCKET");
3900 #ifdef HAVE_MSG_NOSIGNAL
3901 sendto_flags |= MSG_NOSIGNAL;
3902 #endif
3903 if (sendmsg(fd, &hdr, sendto_flags) < 0) {
3904 serverLog(LL_WARNING, "Can't send notification to systemd");
3905 close(fd);
3906 return 0;
3907 }
3908 close(fd);
3909 return 1;
3910 }
3911
redisIsSupervised(int mode)3912 int redisIsSupervised(int mode) {
3913 if (mode == SUPERVISED_AUTODETECT) {
3914 const char *upstart_job = getenv("UPSTART_JOB");
3915 const char *notify_socket = getenv("NOTIFY_SOCKET");
3916
3917 if (upstart_job) {
3918 redisSupervisedUpstart();
3919 } else if (notify_socket) {
3920 redisSupervisedSystemd();
3921 }
3922 } else if (mode == SUPERVISED_UPSTART) {
3923 return redisSupervisedUpstart();
3924 } else if (mode == SUPERVISED_SYSTEMD) {
3925 return redisSupervisedSystemd();
3926 }
3927
3928 return 0;
3929 }
3930
3931
main(int argc,char ** argv)3932 int main(int argc, char **argv) {
3933 struct timeval tv;
3934 int j;
3935
3936 #ifdef REDIS_TEST
3937 if (argc == 3 && !strcasecmp(argv[1], "test")) {
3938 if (!strcasecmp(argv[2], "ziplist")) {
3939 return ziplistTest(argc, argv);
3940 } else if (!strcasecmp(argv[2], "quicklist")) {
3941 quicklistTest(argc, argv);
3942 } else if (!strcasecmp(argv[2], "intset")) {
3943 return intsetTest(argc, argv);
3944 } else if (!strcasecmp(argv[2], "zipmap")) {
3945 return zipmapTest(argc, argv);
3946 } else if (!strcasecmp(argv[2], "sha1test")) {
3947 return sha1Test(argc, argv);
3948 } else if (!strcasecmp(argv[2], "util")) {
3949 return utilTest(argc, argv);
3950 } else if (!strcasecmp(argv[2], "sds")) {
3951 return sdsTest(argc, argv);
3952 } else if (!strcasecmp(argv[2], "endianconv")) {
3953 return endianconvTest(argc, argv);
3954 } else if (!strcasecmp(argv[2], "crc64")) {
3955 return crc64Test(argc, argv);
3956 }
3957
3958 return -1; /* test not found */
3959 }
3960 #endif
3961
3962 /* We need to initialize our libraries, and the server configuration. */
3963 #ifdef INIT_SETPROCTITLE_REPLACEMENT
3964 spt_init(argc, argv);
3965 #endif
3966 setlocale(LC_COLLATE,"");
3967 zmalloc_enable_thread_safeness();
3968 zmalloc_set_oom_handler(redisOutOfMemoryHandler);
3969 srand(time(NULL)^getpid());
3970 gettimeofday(&tv,NULL);
3971 dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
3972 server.sentinel_mode = checkForSentinelMode(argc,argv);
3973 initServerConfig();
3974
3975 /* Store the executable path and arguments in a safe place in order
3976 * to be able to restart the server later. */
3977 server.executable = getAbsolutePath(argv[0]);
3978 server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
3979 server.exec_argv[argc] = NULL;
3980 for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
3981
3982 /* We need to init sentinel right now as parsing the configuration file
3983 * in sentinel mode will have the effect of populating the sentinel
3984 * data structures with master nodes to monitor. */
3985 if (server.sentinel_mode) {
3986 initSentinelConfig();
3987 initSentinel();
3988 }
3989
3990 /* Check if we need to start in redis-check-rdb mode. We just execute
3991 * the program main. However the program is part of the Redis executable
3992 * so that we can easily execute an RDB check on loading errors. */
3993 if (strstr(argv[0],"redis-check-rdb") != NULL)
3994 redis_check_rdb_main(argc,argv);
3995
3996 if (argc >= 2) {
3997 j = 1; /* First option to parse in argv[] */
3998 sds options = sdsempty();
3999 char *configfile = NULL;
4000
4001 /* Handle special options --help and --version */
4002 if (strcmp(argv[1], "-v") == 0 ||
4003 strcmp(argv[1], "--version") == 0) version();
4004 if (strcmp(argv[1], "--help") == 0 ||
4005 strcmp(argv[1], "-h") == 0) usage();
4006 if (strcmp(argv[1], "--test-memory") == 0) {
4007 if (argc == 3) {
4008 memtest(atoi(argv[2]),50);
4009 exit(0);
4010 } else {
4011 fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
4012 fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
4013 exit(1);
4014 }
4015 }
4016
4017 /* First argument is the config file name? */
4018 if (argv[j][0] != '-' || argv[j][1] != '-') {
4019 configfile = argv[j];
4020 server.configfile = getAbsolutePath(configfile);
4021 /* Replace the config file in server.exec_argv with
4022 * its absoulte path. */
4023 zfree(server.exec_argv[j]);
4024 server.exec_argv[j] = zstrdup(server.configfile);
4025 j++;
4026 }
4027
4028 /* All the other options are parsed and conceptually appended to the
4029 * configuration file. For instance --port 6380 will generate the
4030 * string "port 6380\n" to be parsed after the actual file name
4031 * is parsed, if any. */
4032 while(j != argc) {
4033 if (argv[j][0] == '-' && argv[j][1] == '-') {
4034 /* Option name */
4035 if (!strcmp(argv[j], "--check-rdb")) {
4036 /* Argument has no options, need to skip for parsing. */
4037 j++;
4038 continue;
4039 }
4040 if (sdslen(options)) options = sdscat(options,"\n");
4041 options = sdscat(options,argv[j]+2);
4042 options = sdscat(options," ");
4043 } else {
4044 /* Option argument */
4045 options = sdscatrepr(options,argv[j],strlen(argv[j]));
4046 options = sdscat(options," ");
4047 }
4048 j++;
4049 }
4050 if (server.sentinel_mode && configfile && *configfile == '-') {
4051 serverLog(LL_WARNING,
4052 "Sentinel config from STDIN not allowed.");
4053 serverLog(LL_WARNING,
4054 "Sentinel needs config file on disk to save state. Exiting...");
4055 exit(1);
4056 }
4057 resetServerSaveParams();
4058 loadServerConfig(configfile,options);
4059 sdsfree(options);
4060 } else {
4061 serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
4062 }
4063
4064 server.supervised = redisIsSupervised(server.supervised_mode);
4065 int background = server.daemonize && !server.supervised;
4066 if (background) daemonize();
4067
4068 initServer();
4069 if (background || server.pidfile) createPidFile();
4070 redisSetProcTitle(argv[0]);
4071 redisAsciiArt();
4072 checkTcpBacklogSettings();
4073
4074 if (!server.sentinel_mode) {
4075 /* Things not needed when running in Sentinel mode. */
4076 serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
4077 #ifdef __linux__
4078 linuxMemoryWarnings();
4079 #endif
4080 loadDataFromDisk();
4081 if (server.cluster_enabled) {
4082 if (verifyClusterConfigWithData() == C_ERR) {
4083 serverLog(LL_WARNING,
4084 "You can't have keys in a DB different than DB 0 when in "
4085 "Cluster mode. Exiting.");
4086 exit(1);
4087 }
4088 }
4089 if (server.ipfd_count > 0)
4090 serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4091 if (server.sofd > 0)
4092 serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
4093 } else {
4094 sentinelIsRunning();
4095 }
4096
4097 /* Warning the user about suspicious maxmemory setting. */
4098 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
4099 serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
4100 }
4101
4102 aeSetBeforeSleepProc(server.el,beforeSleep);
4103 aeMain(server.el);
4104 aeDeleteEventLoop(server.el);
4105 return 0;
4106 }
4107
4108 /* The End */
4109