xref: /f-stack/app/redis-5.0.5/src/aof.c (revision 572c4311)
1 /*
2  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *   * Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *   * Redistributions in binary form must reproduce the above copyright
11  *     notice, this list of conditions and the following disclaimer in the
12  *     documentation and/or other materials provided with the distribution.
13  *   * Neither the name of Redis nor the names of its contributors may be used
14  *     to endorse or promote products derived from this software without
15  *     specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "server.h"
31 #include "bio.h"
32 #include "rio.h"
33 
34 #include <signal.h>
35 #include <fcntl.h>
36 #include <sys/stat.h>
37 #include <sys/types.h>
38 #include <sys/time.h>
39 #include <sys/resource.h>
40 #include <sys/wait.h>
41 #include <sys/param.h>
42 
43 void aofUpdateCurrentSize(void);
44 void aofClosePipes(void);
45 
46 /* ----------------------------------------------------------------------------
47  * AOF rewrite buffer implementation.
48  *
49  * The following code implement a simple buffer used in order to accumulate
50  * changes while the background process is rewriting the AOF file.
51  *
52  * We only need to append, but can't just use realloc with a large block
53  * because 'huge' reallocs are not always handled as one could expect
54  * (via remapping of pages at OS level) but may involve copying data.
55  *
56  * For this reason we use a list of blocks, every block is
57  * AOF_RW_BUF_BLOCK_SIZE bytes.
58  * ------------------------------------------------------------------------- */
59 
60 #define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
61 
62 typedef struct aofrwblock {
63     unsigned long used, free;
64     char buf[AOF_RW_BUF_BLOCK_SIZE];
65 } aofrwblock;
66 
67 /* This function free the old AOF rewrite buffer if needed, and initialize
68  * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
69  * so can be used for the first initialization as well. */
aofRewriteBufferReset(void)70 void aofRewriteBufferReset(void) {
71     if (server.aof_rewrite_buf_blocks)
72         listRelease(server.aof_rewrite_buf_blocks);
73 
74     server.aof_rewrite_buf_blocks = listCreate();
75     listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
76 }
77 
78 /* Return the current size of the AOF rewrite buffer. */
aofRewriteBufferSize(void)79 unsigned long aofRewriteBufferSize(void) {
80     listNode *ln;
81     listIter li;
82     unsigned long size = 0;
83 
84     listRewind(server.aof_rewrite_buf_blocks,&li);
85     while((ln = listNext(&li))) {
86         aofrwblock *block = listNodeValue(ln);
87         size += block->used;
88     }
89     return size;
90 }
91 
92 /* Event handler used to send data to the child process doing the AOF
93  * rewrite. We send pieces of our AOF differences buffer so that the final
94  * write when the child finishes the rewrite will be small. */
aofChildWriteDiffData(aeEventLoop * el,int fd,void * privdata,int mask)95 void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
96     listNode *ln;
97     aofrwblock *block;
98     ssize_t nwritten;
99     UNUSED(el);
100     UNUSED(fd);
101     UNUSED(privdata);
102     UNUSED(mask);
103 
104     while(1) {
105         ln = listFirst(server.aof_rewrite_buf_blocks);
106         block = ln ? ln->value : NULL;
107         if (server.aof_stop_sending_diff || !block) {
108             aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
109                               AE_WRITABLE);
110             return;
111         }
112         if (block->used > 0) {
113             nwritten = write(server.aof_pipe_write_data_to_child,
114                              block->buf,block->used);
115             if (nwritten <= 0) return;
116             memmove(block->buf,block->buf+nwritten,block->used-nwritten);
117             block->used -= nwritten;
118             block->free += nwritten;
119         }
120         if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
121     }
122 }
123 
124 /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
aofRewriteBufferAppend(unsigned char * s,unsigned long len)125 void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
126     listNode *ln = listLast(server.aof_rewrite_buf_blocks);
127     aofrwblock *block = ln ? ln->value : NULL;
128 
129     while(len) {
130         /* If we already got at least an allocated block, try appending
131          * at least some piece into it. */
132         if (block) {
133             unsigned long thislen = (block->free < len) ? block->free : len;
134             if (thislen) {  /* The current block is not already full. */
135                 memcpy(block->buf+block->used, s, thislen);
136                 block->used += thislen;
137                 block->free -= thislen;
138                 s += thislen;
139                 len -= thislen;
140             }
141         }
142 
143         if (len) { /* First block to allocate, or need another block. */
144             int numblocks;
145 
146             block = zmalloc(sizeof(*block));
147             block->free = AOF_RW_BUF_BLOCK_SIZE;
148             block->used = 0;
149             listAddNodeTail(server.aof_rewrite_buf_blocks,block);
150 
151             /* Log every time we cross more 10 or 100 blocks, respectively
152              * as a notice or warning. */
153             numblocks = listLength(server.aof_rewrite_buf_blocks);
154             if (((numblocks+1) % 10) == 0) {
155                 int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
156                                                          LL_NOTICE;
157                 serverLog(level,"Background AOF buffer size: %lu MB",
158                     aofRewriteBufferSize()/(1024*1024));
159             }
160         }
161     }
162 
163     /* Install a file event to send data to the rewrite child if there is
164      * not one already. */
165     if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
166         aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
167             AE_WRITABLE, aofChildWriteDiffData, NULL);
168     }
169 }
170 
171 /* Write the buffer (possibly composed of multiple blocks) into the specified
172  * fd. If a short write or any other error happens -1 is returned,
173  * otherwise the number of bytes written is returned. */
aofRewriteBufferWrite(int fd)174 ssize_t aofRewriteBufferWrite(int fd) {
175     listNode *ln;
176     listIter li;
177     ssize_t count = 0;
178 
179     listRewind(server.aof_rewrite_buf_blocks,&li);
180     while((ln = listNext(&li))) {
181         aofrwblock *block = listNodeValue(ln);
182         ssize_t nwritten;
183 
184         if (block->used) {
185             nwritten = write(fd,block->buf,block->used);
186             if (nwritten != (ssize_t)block->used) {
187                 if (nwritten == 0) errno = EIO;
188                 return -1;
189             }
190             count += nwritten;
191         }
192     }
193     return count;
194 }
195 
196 /* ----------------------------------------------------------------------------
197  * AOF file implementation
198  * ------------------------------------------------------------------------- */
199 
200 /* Return true if an AOf fsync is currently already in progress in a
201  * BIO thread. */
aofFsyncInProgress(void)202 int aofFsyncInProgress(void) {
203     return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
204 }
205 
206 /* Starts a background task that performs fsync() against the specified
207  * file descriptor (the one of the AOF file) in another thread. */
aof_background_fsync(int fd)208 void aof_background_fsync(int fd) {
209     bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
210 }
211 
212 /* Kills an AOFRW child process if exists */
killAppendOnlyChild(void)213 static void killAppendOnlyChild(void) {
214     int statloc;
215     /* No AOFRW child? return. */
216     if (server.aof_child_pid == -1) return;
217     /* Kill AOFRW child, wait for child exit. */
218     serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
219         (long) server.aof_child_pid);
220     if (kill(server.aof_child_pid,SIGUSR1) != -1) {
221         while(wait3(&statloc,0,NULL) != server.aof_child_pid);
222     }
223     /* Reset the buffer accumulating changes while the child saves. */
224     aofRewriteBufferReset();
225     aofRemoveTempFile(server.aof_child_pid);
226     server.aof_child_pid = -1;
227     server.aof_rewrite_time_start = -1;
228     /* Close pipes used for IPC between the two processes. */
229     aofClosePipes();
230 }
231 
232 /* Called when the user switches from "appendonly yes" to "appendonly no"
233  * at runtime using the CONFIG command. */
stopAppendOnly(void)234 void stopAppendOnly(void) {
235     serverAssert(server.aof_state != AOF_OFF);
236     flushAppendOnlyFile(1);
237     redis_fsync(server.aof_fd);
238     close(server.aof_fd);
239 
240     server.aof_fd = -1;
241     server.aof_selected_db = -1;
242     server.aof_state = AOF_OFF;
243     killAppendOnlyChild();
244 }
245 
246 /* Called when the user switches from "appendonly no" to "appendonly yes"
247  * at runtime using the CONFIG command. */
startAppendOnly(void)248 int startAppendOnly(void) {
249     char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
250     int newfd;
251 
252     newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
253     serverAssert(server.aof_state == AOF_OFF);
254     if (newfd == -1) {
255         char *cwdp = getcwd(cwd,MAXPATHLEN);
256 
257         serverLog(LL_WARNING,
258             "Redis needs to enable the AOF but can't open the "
259             "append only file %s (in server root dir %s): %s",
260             server.aof_filename,
261             cwdp ? cwdp : "unknown",
262             strerror(errno));
263         return C_ERR;
264     }
265     if (server.rdb_child_pid != -1) {
266         server.aof_rewrite_scheduled = 1;
267         serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
268     } else {
269         /* If there is a pending AOF rewrite, we need to switch it off and
270          * start a new one: the old one cannot be reused because it is not
271          * accumulating the AOF buffer. */
272         if (server.aof_child_pid != -1) {
273             serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
274             killAppendOnlyChild();
275         }
276         if (rewriteAppendOnlyFileBackground() == C_ERR) {
277             close(newfd);
278             serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
279             return C_ERR;
280         }
281     }
282     /* We correctly switched on AOF, now wait for the rewrite to be complete
283      * in order to append data on disk. */
284     server.aof_state = AOF_WAIT_REWRITE;
285     server.aof_last_fsync = server.unixtime;
286     server.aof_fd = newfd;
287     return C_OK;
288 }
289 
290 /* This is a wrapper to the write syscall in order to retry on short writes
291  * or if the syscall gets interrupted. It could look strange that we retry
292  * on short writes given that we are writing to a block device: normally if
293  * the first call is short, there is a end-of-space condition, so the next
294  * is likely to fail. However apparently in modern systems this is no longer
295  * true, and in general it looks just more resilient to retry the write. If
296  * there is an actual error condition we'll get it at the next try. */
aofWrite(int fd,const char * buf,size_t len)297 ssize_t aofWrite(int fd, const char *buf, size_t len) {
298     ssize_t nwritten = 0, totwritten = 0;
299 
300     while(len) {
301         nwritten = write(fd, buf, len);
302 
303         if (nwritten < 0) {
304             if (errno == EINTR) {
305                 continue;
306             }
307             return totwritten ? totwritten : -1;
308         }
309 
310         len -= nwritten;
311         buf += nwritten;
312         totwritten += nwritten;
313     }
314 
315     return totwritten;
316 }
317 
318 /* Write the append only file buffer on disk.
319  *
320  * Since we are required to write the AOF before replying to the client,
321  * and the only way the client socket can get a write is entering when the
322  * the event loop, we accumulate all the AOF writes in a memory
323  * buffer and write it on disk using this function just before entering
324  * the event loop again.
325  *
326  * About the 'force' argument:
327  *
328  * When the fsync policy is set to 'everysec' we may delay the flush if there
329  * is still an fsync() going on in the background thread, since for instance
330  * on Linux write(2) will be blocked by the background fsync anyway.
331  * When this happens we remember that there is some aof buffer to be
332  * flushed ASAP, and will try to do that in the serverCron() function.
333  *
334  * However if force is set to 1 we'll write regardless of the background
335  * fsync. */
336 #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
flushAppendOnlyFile(int force)337 void flushAppendOnlyFile(int force) {
338     ssize_t nwritten;
339     int sync_in_progress = 0;
340     mstime_t latency;
341 
342     if (sdslen(server.aof_buf) == 0) {
343         /* Check if we need to do fsync even the aof buffer is empty,
344          * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
345          * called only when aof buffer is not empty, so if users
346          * stop write commands before fsync called in one second,
347          * the data in page cache cannot be flushed in time. */
348         if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
349             server.aof_fsync_offset != server.aof_current_size &&
350             server.unixtime > server.aof_last_fsync &&
351             !(sync_in_progress = aofFsyncInProgress())) {
352             goto try_fsync;
353         } else {
354             return;
355         }
356     }
357 
358     if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
359         sync_in_progress = aofFsyncInProgress();
360 
361     if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
362         /* With this append fsync policy we do background fsyncing.
363          * If the fsync is still in progress we can try to delay
364          * the write for a couple of seconds. */
365         if (sync_in_progress) {
366             if (server.aof_flush_postponed_start == 0) {
367                 /* No previous write postponing, remember that we are
368                  * postponing the flush and return. */
369                 server.aof_flush_postponed_start = server.unixtime;
370                 return;
371             } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
372                 /* We were already waiting for fsync to finish, but for less
373                  * than two seconds this is still ok. Postpone again. */
374                 return;
375             }
376             /* Otherwise fall trough, and go write since we can't wait
377              * over two seconds. */
378             server.aof_delayed_fsync++;
379             serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
380         }
381     }
382     /* We want to perform a single write. This should be guaranteed atomic
383      * at least if the filesystem we are writing is a real physical one.
384      * While this will save us against the server being killed I don't think
385      * there is much to do about the whole server stopping for power problems
386      * or alike */
387 
388     latencyStartMonitor(latency);
389     nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
390     latencyEndMonitor(latency);
391     /* We want to capture different events for delayed writes:
392      * when the delay happens with a pending fsync, or with a saving child
393      * active, and when the above two conditions are missing.
394      * We also use an additional event name to save all samples which is
395      * useful for graphing / monitoring purposes. */
396     if (sync_in_progress) {
397         latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
398     } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
399         latencyAddSampleIfNeeded("aof-write-active-child",latency);
400     } else {
401         latencyAddSampleIfNeeded("aof-write-alone",latency);
402     }
403     latencyAddSampleIfNeeded("aof-write",latency);
404 
405     /* We performed the write so reset the postponed flush sentinel to zero. */
406     server.aof_flush_postponed_start = 0;
407 
408     if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
409         static time_t last_write_error_log = 0;
410         int can_log = 0;
411 
412         /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
413         if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
414             can_log = 1;
415             last_write_error_log = server.unixtime;
416         }
417 
418         /* Log the AOF write error and record the error code. */
419         if (nwritten == -1) {
420             if (can_log) {
421                 serverLog(LL_WARNING,"Error writing to the AOF file: %s",
422                     strerror(errno));
423                 server.aof_last_write_errno = errno;
424             }
425         } else {
426             if (can_log) {
427                 serverLog(LL_WARNING,"Short write while writing to "
428                                        "the AOF file: (nwritten=%lld, "
429                                        "expected=%lld)",
430                                        (long long)nwritten,
431                                        (long long)sdslen(server.aof_buf));
432             }
433 
434             if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
435                 if (can_log) {
436                     serverLog(LL_WARNING, "Could not remove short write "
437                              "from the append-only file.  Redis may refuse "
438                              "to load the AOF the next time it starts.  "
439                              "ftruncate: %s", strerror(errno));
440                 }
441             } else {
442                 /* If the ftruncate() succeeded we can set nwritten to
443                  * -1 since there is no longer partial data into the AOF. */
444                 nwritten = -1;
445             }
446             server.aof_last_write_errno = ENOSPC;
447         }
448 
449         /* Handle the AOF write error. */
450         if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
451             /* We can't recover when the fsync policy is ALWAYS since the
452              * reply for the client is already in the output buffers, and we
453              * have the contract with the user that on acknowledged write data
454              * is synced on disk. */
455             serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
456             exit(1);
457         } else {
458             /* Recover from failed write leaving data into the buffer. However
459              * set an error to stop accepting writes as long as the error
460              * condition is not cleared. */
461             server.aof_last_write_status = C_ERR;
462 
463             /* Trim the sds buffer if there was a partial write, and there
464              * was no way to undo it with ftruncate(2). */
465             if (nwritten > 0) {
466                 server.aof_current_size += nwritten;
467                 sdsrange(server.aof_buf,nwritten,-1);
468             }
469             return; /* We'll try again on the next call... */
470         }
471     } else {
472         /* Successful write(2). If AOF was in error state, restore the
473          * OK state and log the event. */
474         if (server.aof_last_write_status == C_ERR) {
475             serverLog(LL_WARNING,
476                 "AOF write error looks solved, Redis can write again.");
477             server.aof_last_write_status = C_OK;
478         }
479     }
480     server.aof_current_size += nwritten;
481 
482     /* Re-use AOF buffer when it is small enough. The maximum comes from the
483      * arena size of 4k minus some overhead (but is otherwise arbitrary). */
484     if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
485         sdsclear(server.aof_buf);
486     } else {
487         sdsfree(server.aof_buf);
488         server.aof_buf = sdsempty();
489     }
490 
491 try_fsync:
492     /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
493      * children doing I/O in the background. */
494     if (server.aof_no_fsync_on_rewrite &&
495         (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
496             return;
497 
498     /* Perform the fsync if needed. */
499     if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
500         /* redis_fsync is defined as fdatasync() for Linux in order to avoid
501          * flushing metadata. */
502         latencyStartMonitor(latency);
503         redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
504         latencyEndMonitor(latency);
505         latencyAddSampleIfNeeded("aof-fsync-always",latency);
506         server.aof_fsync_offset = server.aof_current_size;
507         server.aof_last_fsync = server.unixtime;
508     } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
509                 server.unixtime > server.aof_last_fsync)) {
510         if (!sync_in_progress) {
511             aof_background_fsync(server.aof_fd);
512             server.aof_fsync_offset = server.aof_current_size;
513         }
514         server.aof_last_fsync = server.unixtime;
515     }
516 }
517 
catAppendOnlyGenericCommand(sds dst,int argc,robj ** argv)518 sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
519     char buf[32];
520     int len, j;
521     robj *o;
522 
523     buf[0] = '*';
524     len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
525     buf[len++] = '\r';
526     buf[len++] = '\n';
527     dst = sdscatlen(dst,buf,len);
528 
529     for (j = 0; j < argc; j++) {
530         o = getDecodedObject(argv[j]);
531         buf[0] = '$';
532         len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
533         buf[len++] = '\r';
534         buf[len++] = '\n';
535         dst = sdscatlen(dst,buf,len);
536         dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
537         dst = sdscatlen(dst,"\r\n",2);
538         decrRefCount(o);
539     }
540     return dst;
541 }
542 
543 /* Create the sds representation of an PEXPIREAT command, using
544  * 'seconds' as time to live and 'cmd' to understand what command
545  * we are translating into a PEXPIREAT.
546  *
547  * This command is used in order to translate EXPIRE and PEXPIRE commands
548  * into PEXPIREAT command so that we retain precision in the append only
549  * file, and the time is always absolute and not relative. */
catAppendOnlyExpireAtCommand(sds buf,struct redisCommand * cmd,robj * key,robj * seconds)550 sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
551     long long when;
552     robj *argv[3];
553 
554     /* Make sure we can use strtoll */
555     seconds = getDecodedObject(seconds);
556     when = strtoll(seconds->ptr,NULL,10);
557     /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
558     if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
559         cmd->proc == expireatCommand)
560     {
561         when *= 1000;
562     }
563     /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
564     if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
565         cmd->proc == setexCommand || cmd->proc == psetexCommand)
566     {
567         when += mstime();
568     }
569     decrRefCount(seconds);
570 
571     argv[0] = createStringObject("PEXPIREAT",9);
572     argv[1] = key;
573     argv[2] = createStringObjectFromLongLong(when);
574     buf = catAppendOnlyGenericCommand(buf, 3, argv);
575     decrRefCount(argv[0]);
576     decrRefCount(argv[2]);
577     return buf;
578 }
579 
feedAppendOnlyFile(struct redisCommand * cmd,int dictid,robj ** argv,int argc)580 void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
581     sds buf = sdsempty();
582     robj *tmpargv[3];
583 
584     /* The DB this command was targeting is not the same as the last command
585      * we appended. To issue a SELECT command is needed. */
586     if (dictid != server.aof_selected_db) {
587         char seldb[64];
588 
589         snprintf(seldb,sizeof(seldb),"%d",dictid);
590         buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
591             (unsigned long)strlen(seldb),seldb);
592         server.aof_selected_db = dictid;
593     }
594 
595     if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
596         cmd->proc == expireatCommand) {
597         /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
598         buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
599     } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
600         /* Translate SETEX/PSETEX to SET and PEXPIREAT */
601         tmpargv[0] = createStringObject("SET",3);
602         tmpargv[1] = argv[1];
603         tmpargv[2] = argv[3];
604         buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
605         decrRefCount(tmpargv[0]);
606         buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
607     } else if (cmd->proc == setCommand && argc > 3) {
608         int i;
609         robj *exarg = NULL, *pxarg = NULL;
610         /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
611         buf = catAppendOnlyGenericCommand(buf,3,argv);
612         for (i = 3; i < argc; i ++) {
613             if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
614             if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
615         }
616         serverAssert(!(exarg && pxarg));
617         if (exarg)
618             buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
619                                                exarg);
620         if (pxarg)
621             buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
622                                                pxarg);
623     } else {
624         /* All the other commands don't need translation or need the
625          * same translation already operated in the command vector
626          * for the replication itself. */
627         buf = catAppendOnlyGenericCommand(buf,argc,argv);
628     }
629 
630     /* Append to the AOF buffer. This will be flushed on disk just before
631      * of re-entering the event loop, so before the client will get a
632      * positive reply about the operation performed. */
633     if (server.aof_state == AOF_ON)
634         server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
635 
636     /* If a background append only file rewriting is in progress we want to
637      * accumulate the differences between the child DB and the current one
638      * in a buffer, so that when the child process will do its work we
639      * can append the differences to the new append only file. */
640     if (server.aof_child_pid != -1)
641         aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
642 
643     sdsfree(buf);
644 }
645 
646 /* ----------------------------------------------------------------------------
647  * AOF loading
648  * ------------------------------------------------------------------------- */
649 
650 /* In Redis commands are always executed in the context of a client, so in
651  * order to load the append only file we need to create a fake client. */
createFakeClient(void)652 struct client *createFakeClient(void) {
653     struct client *c = zmalloc(sizeof(*c));
654 
655     selectDb(c,0);
656     c->fd = -1;
657     c->name = NULL;
658     c->querybuf = sdsempty();
659     c->querybuf_peak = 0;
660     c->argc = 0;
661     c->argv = NULL;
662     c->bufpos = 0;
663     c->flags = 0;
664     c->btype = BLOCKED_NONE;
665     /* We set the fake client as a slave waiting for the synchronization
666      * so that Redis will not try to send replies to this client. */
667     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
668     c->reply = listCreate();
669     c->reply_bytes = 0;
670     c->obuf_soft_limit_reached_time = 0;
671     c->watched_keys = listCreate();
672     c->peerid = NULL;
673     listSetFreeMethod(c->reply,freeClientReplyValue);
674     listSetDupMethod(c->reply,dupClientReplyValue);
675     initClientMultiState(c);
676     return c;
677 }
678 
freeFakeClientArgv(struct client * c)679 void freeFakeClientArgv(struct client *c) {
680     int j;
681 
682     for (j = 0; j < c->argc; j++)
683         decrRefCount(c->argv[j]);
684     zfree(c->argv);
685 }
686 
freeFakeClient(struct client * c)687 void freeFakeClient(struct client *c) {
688     sdsfree(c->querybuf);
689     listRelease(c->reply);
690     listRelease(c->watched_keys);
691     freeClientMultiState(c);
692     zfree(c);
693 }
694 
695 /* Replay the append log file. On success C_OK is returned. On non fatal
696  * error (the append only file is zero-length) C_ERR is returned. On
697  * fatal error an error message is logged and the program exists. */
loadAppendOnlyFile(char * filename)698 int loadAppendOnlyFile(char *filename) {
699     struct client *fakeClient;
700     FILE *fp = fopen(filename,"r");
701     struct redis_stat sb;
702     int old_aof_state = server.aof_state;
703     long loops = 0;
704     off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
705     off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
706 
707     if (fp == NULL) {
708         serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
709         exit(1);
710     }
711 
712     /* Handle a zero-length AOF file as a special case. An empty AOF file
713      * is a valid AOF because an empty server with AOF enabled will create
714      * a zero length file at startup, that will remain like that if no write
715      * operation is received. */
716     if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
717         server.aof_current_size = 0;
718         server.aof_fsync_offset = server.aof_current_size;
719         fclose(fp);
720         return C_ERR;
721     }
722 
723     /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
724      * to the same file we're about to read. */
725     server.aof_state = AOF_OFF;
726 
727     fakeClient = createFakeClient();
728     startLoading(fp);
729 
730     /* Check if this AOF file has an RDB preamble. In that case we need to
731      * load the RDB file and later continue loading the AOF tail. */
732     char sig[5]; /* "REDIS" */
733     if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
734         /* No RDB preamble, seek back at 0 offset. */
735         if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
736     } else {
737         /* RDB preamble. Pass loading the RDB functions. */
738         rio rdb;
739 
740         serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
741         if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
742         rioInitWithFile(&rdb,fp);
743         if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
744             serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
745             goto readerr;
746         } else {
747             serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
748         }
749     }
750 
751     /* Read the actual AOF file, in REPL format, command by command. */
752     while(1) {
753         int argc, j;
754         unsigned long len;
755         robj **argv;
756         char buf[128];
757         sds argsds;
758         struct redisCommand *cmd;
759 
760         /* Serve the clients from time to time */
761         if (!(loops++ % 1000)) {
762             loadingProgress(ftello(fp));
763             processEventsWhileBlocked();
764         }
765 
766         if (fgets(buf,sizeof(buf),fp) == NULL) {
767             if (feof(fp))
768                 break;
769             else
770                 goto readerr;
771         }
772         if (buf[0] != '*') goto fmterr;
773         if (buf[1] == '\0') goto readerr;
774         argc = atoi(buf+1);
775         if (argc < 1) goto fmterr;
776 
777         argv = zmalloc(sizeof(robj*)*argc);
778         fakeClient->argc = argc;
779         fakeClient->argv = argv;
780 
781         for (j = 0; j < argc; j++) {
782             if (fgets(buf,sizeof(buf),fp) == NULL) {
783                 fakeClient->argc = j; /* Free up to j-1. */
784                 freeFakeClientArgv(fakeClient);
785                 goto readerr;
786             }
787             if (buf[0] != '$') goto fmterr;
788             len = strtol(buf+1,NULL,10);
789             argsds = sdsnewlen(SDS_NOINIT,len);
790             if (len && fread(argsds,len,1,fp) == 0) {
791                 sdsfree(argsds);
792                 fakeClient->argc = j; /* Free up to j-1. */
793                 freeFakeClientArgv(fakeClient);
794                 goto readerr;
795             }
796             argv[j] = createObject(OBJ_STRING,argsds);
797             if (fread(buf,2,1,fp) == 0) {
798                 fakeClient->argc = j+1; /* Free up to j. */
799                 freeFakeClientArgv(fakeClient);
800                 goto readerr; /* discard CRLF */
801             }
802         }
803 
804         /* Command lookup */
805         cmd = lookupCommand(argv[0]->ptr);
806         if (!cmd) {
807             serverLog(LL_WARNING,
808                 "Unknown command '%s' reading the append only file",
809                 (char*)argv[0]->ptr);
810             exit(1);
811         }
812 
813         if (cmd == server.multiCommand) valid_before_multi = valid_up_to;
814 
815         /* Run the command in the context of a fake client */
816         fakeClient->cmd = cmd;
817         if (fakeClient->flags & CLIENT_MULTI &&
818             fakeClient->cmd->proc != execCommand)
819         {
820             queueMultiCommand(fakeClient);
821         } else {
822             cmd->proc(fakeClient);
823         }
824 
825         /* The fake client should not have a reply */
826         serverAssert(fakeClient->bufpos == 0 &&
827                      listLength(fakeClient->reply) == 0);
828 
829         /* The fake client should never get blocked */
830         serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
831 
832         /* Clean up. Command code may have changed argv/argc so we use the
833          * argv/argc of the client instead of the local variables. */
834         freeFakeClientArgv(fakeClient);
835         fakeClient->cmd = NULL;
836         if (server.aof_load_truncated) valid_up_to = ftello(fp);
837     }
838 
839     /* This point can only be reached when EOF is reached without errors.
840      * If the client is in the middle of a MULTI/EXEC, handle it as it was
841      * a short read, even if technically the protocol is correct: we want
842      * to remove the unprocessed tail and continue. */
843     if (fakeClient->flags & CLIENT_MULTI) {
844         serverLog(LL_WARNING,
845             "Revert incomplete MULTI/EXEC transaction in AOF file");
846         valid_up_to = valid_before_multi;
847         goto uxeof;
848     }
849 
850 loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
851     fclose(fp);
852     freeFakeClient(fakeClient);
853     server.aof_state = old_aof_state;
854     stopLoading();
855     aofUpdateCurrentSize();
856     server.aof_rewrite_base_size = server.aof_current_size;
857     server.aof_fsync_offset = server.aof_current_size;
858     return C_OK;
859 
860 readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
861     if (!feof(fp)) {
862         if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
863         serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
864         exit(1);
865     }
866 
867 uxeof: /* Unexpected AOF end of file. */
868     if (server.aof_load_truncated) {
869         serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
870         serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
871             (unsigned long long) valid_up_to);
872         if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
873             if (valid_up_to == -1) {
874                 serverLog(LL_WARNING,"Last valid command offset is invalid");
875             } else {
876                 serverLog(LL_WARNING,"Error truncating the AOF file: %s",
877                     strerror(errno));
878             }
879         } else {
880             /* Make sure the AOF file descriptor points to the end of the
881              * file after the truncate call. */
882             if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
883                 serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
884                     strerror(errno));
885             } else {
886                 serverLog(LL_WARNING,
887                     "AOF loaded anyway because aof-load-truncated is enabled");
888                 goto loaded_ok;
889             }
890         }
891     }
892     if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
893     serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
894     exit(1);
895 
896 fmterr: /* Format error. */
897     if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
898     serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
899     exit(1);
900 }
901 
902 /* ----------------------------------------------------------------------------
903  * AOF rewrite
904  * ------------------------------------------------------------------------- */
905 
906 /* Delegate writing an object to writing a bulk string or bulk long long.
907  * This is not placed in rio.c since that adds the server.h dependency. */
rioWriteBulkObject(rio * r,robj * obj)908 int rioWriteBulkObject(rio *r, robj *obj) {
909     /* Avoid using getDecodedObject to help copy-on-write (we are often
910      * in a child process when this function is called). */
911     if (obj->encoding == OBJ_ENCODING_INT) {
912         return rioWriteBulkLongLong(r,(long)obj->ptr);
913     } else if (sdsEncodedObject(obj)) {
914         return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
915     } else {
916         serverPanic("Unknown string encoding");
917     }
918 }
919 
920 /* Emit the commands needed to rebuild a list object.
921  * The function returns 0 on error, 1 on success. */
rewriteListObject(rio * r,robj * key,robj * o)922 int rewriteListObject(rio *r, robj *key, robj *o) {
923     long long count = 0, items = listTypeLength(o);
924 
925     if (o->encoding == OBJ_ENCODING_QUICKLIST) {
926         quicklist *list = o->ptr;
927         quicklistIter *li = quicklistGetIterator(list, AL_START_HEAD);
928         quicklistEntry entry;
929 
930         while (quicklistNext(li,&entry)) {
931             if (count == 0) {
932                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
933                     AOF_REWRITE_ITEMS_PER_CMD : items;
934                 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
935                 if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
936                 if (rioWriteBulkObject(r,key) == 0) return 0;
937             }
938 
939             if (entry.value) {
940                 if (rioWriteBulkString(r,(char*)entry.value,entry.sz) == 0) return 0;
941             } else {
942                 if (rioWriteBulkLongLong(r,entry.longval) == 0) return 0;
943             }
944             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
945             items--;
946         }
947         quicklistReleaseIterator(li);
948     } else {
949         serverPanic("Unknown list encoding");
950     }
951     return 1;
952 }
953 
954 /* Emit the commands needed to rebuild a set object.
955  * The function returns 0 on error, 1 on success. */
rewriteSetObject(rio * r,robj * key,robj * o)956 int rewriteSetObject(rio *r, robj *key, robj *o) {
957     long long count = 0, items = setTypeSize(o);
958 
959     if (o->encoding == OBJ_ENCODING_INTSET) {
960         int ii = 0;
961         int64_t llval;
962 
963         while(intsetGet(o->ptr,ii++,&llval)) {
964             if (count == 0) {
965                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
966                     AOF_REWRITE_ITEMS_PER_CMD : items;
967 
968                 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
969                 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
970                 if (rioWriteBulkObject(r,key) == 0) return 0;
971             }
972             if (rioWriteBulkLongLong(r,llval) == 0) return 0;
973             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
974             items--;
975         }
976     } else if (o->encoding == OBJ_ENCODING_HT) {
977         dictIterator *di = dictGetIterator(o->ptr);
978         dictEntry *de;
979 
980         while((de = dictNext(di)) != NULL) {
981             sds ele = dictGetKey(de);
982             if (count == 0) {
983                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
984                     AOF_REWRITE_ITEMS_PER_CMD : items;
985 
986                 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
987                 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
988                 if (rioWriteBulkObject(r,key) == 0) return 0;
989             }
990             if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
991             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
992             items--;
993         }
994         dictReleaseIterator(di);
995     } else {
996         serverPanic("Unknown set encoding");
997     }
998     return 1;
999 }
1000 
1001 /* Emit the commands needed to rebuild a sorted set object.
1002  * The function returns 0 on error, 1 on success. */
rewriteSortedSetObject(rio * r,robj * key,robj * o)1003 int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
1004     long long count = 0, items = zsetLength(o);
1005 
1006     if (o->encoding == OBJ_ENCODING_ZIPLIST) {
1007         unsigned char *zl = o->ptr;
1008         unsigned char *eptr, *sptr;
1009         unsigned char *vstr;
1010         unsigned int vlen;
1011         long long vll;
1012         double score;
1013 
1014         eptr = ziplistIndex(zl,0);
1015         serverAssert(eptr != NULL);
1016         sptr = ziplistNext(zl,eptr);
1017         serverAssert(sptr != NULL);
1018 
1019         while (eptr != NULL) {
1020             serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
1021             score = zzlGetScore(sptr);
1022 
1023             if (count == 0) {
1024                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1025                     AOF_REWRITE_ITEMS_PER_CMD : items;
1026 
1027                 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
1028                 if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
1029                 if (rioWriteBulkObject(r,key) == 0) return 0;
1030             }
1031             if (rioWriteBulkDouble(r,score) == 0) return 0;
1032             if (vstr != NULL) {
1033                 if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
1034             } else {
1035                 if (rioWriteBulkLongLong(r,vll) == 0) return 0;
1036             }
1037             zzlNext(zl,&eptr,&sptr);
1038             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1039             items--;
1040         }
1041     } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
1042         zset *zs = o->ptr;
1043         dictIterator *di = dictGetIterator(zs->dict);
1044         dictEntry *de;
1045 
1046         while((de = dictNext(di)) != NULL) {
1047             sds ele = dictGetKey(de);
1048             double *score = dictGetVal(de);
1049 
1050             if (count == 0) {
1051                 int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1052                     AOF_REWRITE_ITEMS_PER_CMD : items;
1053 
1054                 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
1055                 if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
1056                 if (rioWriteBulkObject(r,key) == 0) return 0;
1057             }
1058             if (rioWriteBulkDouble(r,*score) == 0) return 0;
1059             if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
1060             if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1061             items--;
1062         }
1063         dictReleaseIterator(di);
1064     } else {
1065         serverPanic("Unknown sorted zset encoding");
1066     }
1067     return 1;
1068 }
1069 
1070 /* Write either the key or the value of the currently selected item of a hash.
1071  * The 'hi' argument passes a valid Redis hash iterator.
1072  * The 'what' filed specifies if to write a key or a value and can be
1073  * either OBJ_HASH_KEY or OBJ_HASH_VALUE.
1074  *
1075  * The function returns 0 on error, non-zero on success. */
rioWriteHashIteratorCursor(rio * r,hashTypeIterator * hi,int what)1076 static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
1077     if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
1078         unsigned char *vstr = NULL;
1079         unsigned int vlen = UINT_MAX;
1080         long long vll = LLONG_MAX;
1081 
1082         hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
1083         if (vstr)
1084             return rioWriteBulkString(r, (char*)vstr, vlen);
1085         else
1086             return rioWriteBulkLongLong(r, vll);
1087     } else if (hi->encoding == OBJ_ENCODING_HT) {
1088         sds value = hashTypeCurrentFromHashTable(hi, what);
1089         return rioWriteBulkString(r, value, sdslen(value));
1090     }
1091 
1092     serverPanic("Unknown hash encoding");
1093     return 0;
1094 }
1095 
1096 /* Emit the commands needed to rebuild a hash object.
1097  * The function returns 0 on error, 1 on success. */
rewriteHashObject(rio * r,robj * key,robj * o)1098 int rewriteHashObject(rio *r, robj *key, robj *o) {
1099     hashTypeIterator *hi;
1100     long long count = 0, items = hashTypeLength(o);
1101 
1102     hi = hashTypeInitIterator(o);
1103     while (hashTypeNext(hi) != C_ERR) {
1104         if (count == 0) {
1105             int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
1106                 AOF_REWRITE_ITEMS_PER_CMD : items;
1107 
1108             if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
1109             if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
1110             if (rioWriteBulkObject(r,key) == 0) return 0;
1111         }
1112 
1113         if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
1114         if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE) == 0) return 0;
1115         if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
1116         items--;
1117     }
1118 
1119     hashTypeReleaseIterator(hi);
1120 
1121     return 1;
1122 }
1123 
1124 /* Helper for rewriteStreamObject() that generates a bulk string into the
1125  * AOF representing the ID 'id'. */
rioWriteBulkStreamID(rio * r,streamID * id)1126 int rioWriteBulkStreamID(rio *r,streamID *id) {
1127     int retval;
1128 
1129     sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
1130     if ((retval = rioWriteBulkString(r,replyid,sdslen(replyid))) == 0) return 0;
1131     sdsfree(replyid);
1132     return retval;
1133 }
1134 
1135 /* Helper for rewriteStreamObject(): emit the XCLAIM needed in order to
1136  * add the message described by 'nack' having the id 'rawid', into the pending
1137  * list of the specified consumer. All this in the context of the specified
1138  * key and group. */
rioWriteStreamPendingEntry(rio * r,robj * key,const char * groupname,size_t groupname_len,streamConsumer * consumer,unsigned char * rawid,streamNACK * nack)1139 int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
1140      /* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
1141                RETRYCOUNT <count> JUSTID FORCE. */
1142     streamID id;
1143     streamDecodeID(rawid,&id);
1144     if (rioWriteBulkCount(r,'*',12) == 0) return 0;
1145     if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
1146     if (rioWriteBulkObject(r,key) == 0) return 0;
1147     if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
1148     if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
1149     if (rioWriteBulkString(r,"0",1) == 0) return 0;
1150     if (rioWriteBulkStreamID(r,&id) == 0) return 0;
1151     if (rioWriteBulkString(r,"TIME",4) == 0) return 0;
1152     if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0;
1153     if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0;
1154     if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0;
1155     if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0;
1156     if (rioWriteBulkString(r,"FORCE",5) == 0) return 0;
1157     return 1;
1158 }
1159 
1160 /* Emit the commands needed to rebuild a stream object.
1161  * The function returns 0 on error, 1 on success. */
rewriteStreamObject(rio * r,robj * key,robj * o)1162 int rewriteStreamObject(rio *r, robj *key, robj *o) {
1163     stream *s = o->ptr;
1164     streamIterator si;
1165     streamIteratorStart(&si,s,NULL,NULL,0);
1166     streamID id;
1167     int64_t numfields;
1168 
1169     if (s->length) {
1170         /* Reconstruct the stream data using XADD commands. */
1171         while(streamIteratorGetID(&si,&id,&numfields)) {
1172             /* Emit a two elements array for each item. The first is
1173              * the ID, the second is an array of field-value pairs. */
1174 
1175             /* Emit the XADD <key> <id> ...fields... command. */
1176             if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
1177             if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
1178             if (rioWriteBulkObject(r,key) == 0) return 0;
1179             if (rioWriteBulkStreamID(r,&id) == 0) return 0;
1180             while(numfields--) {
1181                 unsigned char *field, *value;
1182                 int64_t field_len, value_len;
1183                 streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
1184                 if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
1185                 if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
1186             }
1187         }
1188     } else {
1189         /* Use the XADD MAXLEN 0 trick to generate an empty stream if
1190          * the key we are serializing is an empty string, which is possible
1191          * for the Stream type. */
1192         if (rioWriteBulkCount(r,'*',7) == 0) return 0;
1193         if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
1194         if (rioWriteBulkObject(r,key) == 0) return 0;
1195         if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
1196         if (rioWriteBulkString(r,"0",1) == 0) return 0;
1197         if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
1198         if (rioWriteBulkString(r,"x",1) == 0) return 0;
1199         if (rioWriteBulkString(r,"y",1) == 0) return 0;
1200     }
1201 
1202     /* Append XSETID after XADD, make sure lastid is correct,
1203      * in case of XDEL lastid. */
1204     if (rioWriteBulkCount(r,'*',3) == 0) return 0;
1205     if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
1206     if (rioWriteBulkObject(r,key) == 0) return 0;
1207     if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
1208 
1209 
1210     /* Create all the stream consumer groups. */
1211     if (s->cgroups) {
1212         raxIterator ri;
1213         raxStart(&ri,s->cgroups);
1214         raxSeek(&ri,"^",NULL,0);
1215         while(raxNext(&ri)) {
1216             streamCG *group = ri.data;
1217             /* Emit the XGROUP CREATE in order to create the group. */
1218             if (rioWriteBulkCount(r,'*',5) == 0) return 0;
1219             if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
1220             if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
1221             if (rioWriteBulkObject(r,key) == 0) return 0;
1222             if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
1223             if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;
1224 
1225             /* Generate XCLAIMs for each consumer that happens to
1226              * have pending entries. Empty consumers have no semantical
1227              * value so they are discarded. */
1228             raxIterator ri_cons;
1229             raxStart(&ri_cons,group->consumers);
1230             raxSeek(&ri_cons,"^",NULL,0);
1231             while(raxNext(&ri_cons)) {
1232                 streamConsumer *consumer = ri_cons.data;
1233                 /* For the current consumer, iterate all the PEL entries
1234                  * to emit the XCLAIM protocol. */
1235                 raxIterator ri_pel;
1236                 raxStart(&ri_pel,consumer->pel);
1237                 raxSeek(&ri_pel,"^",NULL,0);
1238                 while(raxNext(&ri_pel)) {
1239                     streamNACK *nack = ri_pel.data;
1240                     if (rioWriteStreamPendingEntry(r,key,(char*)ri.key,
1241                                                    ri.key_len,consumer,
1242                                                    ri_pel.key,nack) == 0)
1243                     {
1244                         return 0;
1245                     }
1246                 }
1247                 raxStop(&ri_pel);
1248             }
1249             raxStop(&ri_cons);
1250         }
1251         raxStop(&ri);
1252     }
1253 
1254     streamIteratorStop(&si);
1255     return 1;
1256 }
1257 
1258 /* Call the module type callback in order to rewrite a data type
1259  * that is exported by a module and is not handled by Redis itself.
1260  * The function returns 0 on error, 1 on success. */
rewriteModuleObject(rio * r,robj * key,robj * o)1261 int rewriteModuleObject(rio *r, robj *key, robj *o) {
1262     RedisModuleIO io;
1263     moduleValue *mv = o->ptr;
1264     moduleType *mt = mv->type;
1265     moduleInitIOContext(io,mt,r,key);
1266     mt->aof_rewrite(&io,key,mv->value);
1267     if (io.ctx) {
1268         moduleFreeContext(io.ctx);
1269         zfree(io.ctx);
1270     }
1271     return io.error ? 0 : 1;
1272 }
1273 
1274 /* This function is called by the child rewriting the AOF file to read
1275  * the difference accumulated from the parent into a buffer, that is
1276  * concatenated at the end of the rewrite. */
aofReadDiffFromParent(void)1277 ssize_t aofReadDiffFromParent(void) {
1278     char buf[65536]; /* Default pipe buffer size on most Linux systems. */
1279     ssize_t nread, total = 0;
1280 
1281     while ((nread =
1282             read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
1283         server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
1284         total += nread;
1285     }
1286     return total;
1287 }
1288 
rewriteAppendOnlyFileRio(rio * aof)1289 int rewriteAppendOnlyFileRio(rio *aof) {
1290     dictIterator *di = NULL;
1291     dictEntry *de;
1292     size_t processed = 0;
1293     int j;
1294 
1295     for (j = 0; j < server.dbnum; j++) {
1296         char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
1297         redisDb *db = server.db+j;
1298         dict *d = db->dict;
1299         if (dictSize(d) == 0) continue;
1300         di = dictGetSafeIterator(d);
1301 
1302         /* SELECT the new DB */
1303         if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
1304         if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
1305 
1306         /* Iterate this DB writing every entry */
1307         while((de = dictNext(di)) != NULL) {
1308             sds keystr;
1309             robj key, *o;
1310             long long expiretime;
1311 
1312             keystr = dictGetKey(de);
1313             o = dictGetVal(de);
1314             initStaticStringObject(key,keystr);
1315 
1316             expiretime = getExpire(db,&key);
1317 
1318             /* Save the key and associated value */
1319             if (o->type == OBJ_STRING) {
1320                 /* Emit a SET command */
1321                 char cmd[]="*3\r\n$3\r\nSET\r\n";
1322                 if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1323                 /* Key and value */
1324                 if (rioWriteBulkObject(aof,&key) == 0) goto werr;
1325                 if (rioWriteBulkObject(aof,o) == 0) goto werr;
1326             } else if (o->type == OBJ_LIST) {
1327                 if (rewriteListObject(aof,&key,o) == 0) goto werr;
1328             } else if (o->type == OBJ_SET) {
1329                 if (rewriteSetObject(aof,&key,o) == 0) goto werr;
1330             } else if (o->type == OBJ_ZSET) {
1331                 if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
1332             } else if (o->type == OBJ_HASH) {
1333                 if (rewriteHashObject(aof,&key,o) == 0) goto werr;
1334             } else if (o->type == OBJ_STREAM) {
1335                 if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
1336             } else if (o->type == OBJ_MODULE) {
1337                 if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
1338             } else {
1339                 serverPanic("Unknown object type");
1340             }
1341             /* Save the expire time */
1342             if (expiretime != -1) {
1343                 char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
1344                 if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
1345                 if (rioWriteBulkObject(aof,&key) == 0) goto werr;
1346                 if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
1347             }
1348             /* Read some diff from the parent process from time to time. */
1349             if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
1350                 processed = aof->processed_bytes;
1351                 aofReadDiffFromParent();
1352             }
1353         }
1354         dictReleaseIterator(di);
1355         di = NULL;
1356     }
1357     return C_OK;
1358 
1359 werr:
1360     if (di) dictReleaseIterator(di);
1361     return C_ERR;
1362 }
1363 
1364 /* Write a sequence of commands able to fully rebuild the dataset into
1365  * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
1366  *
1367  * In order to minimize the number of commands needed in the rewritten
1368  * log Redis uses variadic commands when possible, such as RPUSH, SADD
1369  * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
1370  * are inserted using a single command. */
rewriteAppendOnlyFile(char * filename)1371 int rewriteAppendOnlyFile(char *filename) {
1372     rio aof;
1373     FILE *fp;
1374     char tmpfile[256];
1375     char byte;
1376 
1377     /* Note that we have to use a different temp name here compared to the
1378      * one used by rewriteAppendOnlyFileBackground() function. */
1379     snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
1380     fp = fopen(tmpfile,"w");
1381     if (!fp) {
1382         serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
1383         return C_ERR;
1384     }
1385 
1386     server.aof_child_diff = sdsempty();
1387     rioInitWithFile(&aof,fp);
1388 
1389     if (server.aof_rewrite_incremental_fsync)
1390         rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
1391 
1392     if (server.aof_use_rdb_preamble) {
1393         int error;
1394         if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
1395             errno = error;
1396             goto werr;
1397         }
1398     } else {
1399         if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
1400     }
1401 
1402     /* Do an initial slow fsync here while the parent is still sending
1403      * data, in order to make the next final fsync faster. */
1404     if (fflush(fp) == EOF) goto werr;
1405     if (fsync(fileno(fp)) == -1) goto werr;
1406 
1407     /* Read again a few times to get more data from the parent.
1408      * We can't read forever (the server may receive data from clients
1409      * faster than it is able to send data to the child), so we try to read
1410      * some more data in a loop as soon as there is a good chance more data
1411      * will come. If it looks like we are wasting time, we abort (this
1412      * happens after 20 ms without new data). */
1413     int nodata = 0;
1414     mstime_t start = mstime();
1415     while(mstime()-start < 1000 && nodata < 20) {
1416         if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
1417         {
1418             nodata++;
1419             continue;
1420         }
1421         nodata = 0; /* Start counting from zero, we stop on N *contiguous*
1422                        timeouts. */
1423         aofReadDiffFromParent();
1424     }
1425 
1426     /* Ask the master to stop sending diffs. */
1427     if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
1428     if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
1429         goto werr;
1430     /* We read the ACK from the server using a 10 seconds timeout. Normally
1431      * it should reply ASAP, but just in case we lose its reply, we are sure
1432      * the child will eventually get terminated. */
1433     if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
1434         byte != '!') goto werr;
1435     serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
1436 
1437     /* Read the final diff if any. */
1438     aofReadDiffFromParent();
1439 
1440     /* Write the received diff to the file. */
1441     serverLog(LL_NOTICE,
1442         "Concatenating %.2f MB of AOF diff received from parent.",
1443         (double) sdslen(server.aof_child_diff) / (1024*1024));
1444     if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
1445         goto werr;
1446 
1447     /* Make sure data will not remain on the OS's output buffers */
1448     if (fflush(fp) == EOF) goto werr;
1449     if (fsync(fileno(fp)) == -1) goto werr;
1450     if (fclose(fp) == EOF) goto werr;
1451 
1452     /* Use RENAME to make sure the DB file is changed atomically only
1453      * if the generate DB file is ok. */
1454     if (rename(tmpfile,filename) == -1) {
1455         serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
1456         unlink(tmpfile);
1457         return C_ERR;
1458     }
1459     serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
1460     return C_OK;
1461 
1462 werr:
1463     serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
1464     fclose(fp);
1465     unlink(tmpfile);
1466     return C_ERR;
1467 }
1468 
1469 /* ----------------------------------------------------------------------------
1470  * AOF rewrite pipes for IPC
1471  * -------------------------------------------------------------------------- */
1472 
1473 /* This event handler is called when the AOF rewriting child sends us a
1474  * single '!' char to signal we should stop sending buffer diffs. The
1475  * parent sends a '!' as well to acknowledge. */
aofChildPipeReadable(aeEventLoop * el,int fd,void * privdata,int mask)1476 void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
1477     char byte;
1478     UNUSED(el);
1479     UNUSED(privdata);
1480     UNUSED(mask);
1481 
1482     if (read(fd,&byte,1) == 1 && byte == '!') {
1483         serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
1484         server.aof_stop_sending_diff = 1;
1485         if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
1486             /* If we can't send the ack, inform the user, but don't try again
1487              * since in the other side the children will use a timeout if the
1488              * kernel can't buffer our write, or, the children was
1489              * terminated. */
1490             serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
1491                 strerror(errno));
1492         }
1493     }
1494     /* Remove the handler since this can be called only one time during a
1495      * rewrite. */
1496     aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
1497 }
1498 
1499 /* Create the pipes used for parent - child process IPC during rewrite.
1500  * We have a data pipe used to send AOF incremental diffs to the child,
1501  * and two other pipes used by the children to signal it finished with
1502  * the rewrite so no more data should be written, and another for the
1503  * parent to acknowledge it understood this new condition. */
aofCreatePipes(void)1504 int aofCreatePipes(void) {
1505     int fds[6] = {-1, -1, -1, -1, -1, -1};
1506     int j;
1507 
1508     if (pipe(fds) == -1) goto error; /* parent -> children data. */
1509     if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
1510     if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
1511     /* Parent -> children data is non blocking. */
1512     if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
1513     if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
1514     if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
1515 
1516     server.aof_pipe_write_data_to_child = fds[1];
1517     server.aof_pipe_read_data_from_parent = fds[0];
1518     server.aof_pipe_write_ack_to_parent = fds[3];
1519     server.aof_pipe_read_ack_from_child = fds[2];
1520     server.aof_pipe_write_ack_to_child = fds[5];
1521     server.aof_pipe_read_ack_from_parent = fds[4];
1522     server.aof_stop_sending_diff = 0;
1523     return C_OK;
1524 
1525 error:
1526     serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
1527         strerror(errno));
1528     for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
1529     return C_ERR;
1530 }
1531 
aofClosePipes(void)1532 void aofClosePipes(void) {
1533     aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
1534     aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
1535     close(server.aof_pipe_write_data_to_child);
1536     close(server.aof_pipe_read_data_from_parent);
1537     close(server.aof_pipe_write_ack_to_parent);
1538     close(server.aof_pipe_read_ack_from_child);
1539     close(server.aof_pipe_write_ack_to_child);
1540     close(server.aof_pipe_read_ack_from_parent);
1541 }
1542 
1543 /* ----------------------------------------------------------------------------
1544  * AOF background rewrite
1545  * ------------------------------------------------------------------------- */
1546 
1547 /* This is how rewriting of the append only file in background works:
1548  *
1549  * 1) The user calls BGREWRITEAOF
1550  * 2) Redis calls this function, that forks():
1551  *    2a) the child rewrite the append only file in a temp file.
1552  *    2b) the parent accumulates differences in server.aof_rewrite_buf.
1553  * 3) When the child finished '2a' exists.
1554  * 4) The parent will trap the exit code, if it's OK, will append the
1555  *    data accumulated into server.aof_rewrite_buf into the temp file, and
1556  *    finally will rename(2) the temp file in the actual file name.
1557  *    The the new file is reopened as the new append only file. Profit!
1558  */
rewriteAppendOnlyFileBackground(void)1559 int rewriteAppendOnlyFileBackground(void) {
1560     pid_t childpid;
1561     long long start;
1562 
1563     if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
1564     if (aofCreatePipes() != C_OK) return C_ERR;
1565     openChildInfoPipe();
1566     start = ustime();
1567     if ((childpid = fork()) == 0) {
1568         char tmpfile[256];
1569 
1570         /* Child */
1571         closeListeningSockets(0);
1572         resetCpuAffinity("aof-rewrite");
1573         redisSetProcTitle("redis-aof-rewrite");
1574         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
1575         if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
1576             size_t private_dirty = zmalloc_get_private_dirty(-1);
1577 
1578             if (private_dirty) {
1579                 serverLog(LL_NOTICE,
1580                     "AOF rewrite: %zu MB of memory used by copy-on-write",
1581                     private_dirty/(1024*1024));
1582             }
1583 
1584             server.child_info_data.cow_size = private_dirty;
1585             sendChildInfo(CHILD_INFO_TYPE_AOF);
1586             exitFromChild(0);
1587         } else {
1588             exitFromChild(1);
1589         }
1590     } else {
1591         /* Parent */
1592         server.stat_fork_time = ustime()-start;
1593         server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
1594         latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
1595         if (childpid == -1) {
1596             closeChildInfoPipe();
1597             serverLog(LL_WARNING,
1598                 "Can't rewrite append only file in background: fork: %s",
1599                 strerror(errno));
1600             aofClosePipes();
1601             return C_ERR;
1602         }
1603         serverLog(LL_NOTICE,
1604             "Background append only file rewriting started by pid %d",childpid);
1605         server.aof_rewrite_scheduled = 0;
1606         server.aof_rewrite_time_start = time(NULL);
1607         server.aof_child_pid = childpid;
1608         updateDictResizePolicy();
1609         /* We set appendseldb to -1 in order to force the next call to the
1610          * feedAppendOnlyFile() to issue a SELECT command, so the differences
1611          * accumulated by the parent into server.aof_rewrite_buf will start
1612          * with a SELECT statement and it will be safe to merge. */
1613         server.aof_selected_db = -1;
1614         replicationScriptCacheFlush();
1615         return C_OK;
1616     }
1617     return C_OK; /* unreached */
1618 }
1619 
bgrewriteaofCommand(client * c)1620 void bgrewriteaofCommand(client *c) {
1621     if (server.aof_child_pid != -1) {
1622         addReplyError(c,"Background append only file rewriting already in progress");
1623     } else if (server.rdb_child_pid != -1) {
1624         server.aof_rewrite_scheduled = 1;
1625         addReplyStatus(c,"Background append only file rewriting scheduled");
1626     } else if (rewriteAppendOnlyFileBackground() == C_OK) {
1627         addReplyStatus(c,"Background append only file rewriting started");
1628     } else {
1629         addReply(c,shared.err);
1630     }
1631 }
1632 
aofRemoveTempFile(pid_t childpid)1633 void aofRemoveTempFile(pid_t childpid) {
1634     char tmpfile[256];
1635 
1636     snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
1637     unlink(tmpfile);
1638 }
1639 
1640 /* Update the server.aof_current_size field explicitly using stat(2)
1641  * to check the size of the file. This is useful after a rewrite or after
1642  * a restart, normally the size is updated just adding the write length
1643  * to the current length, that is much faster. */
aofUpdateCurrentSize(void)1644 void aofUpdateCurrentSize(void) {
1645     struct redis_stat sb;
1646     mstime_t latency;
1647 
1648     latencyStartMonitor(latency);
1649     if (redis_fstat(server.aof_fd,&sb) == -1) {
1650         serverLog(LL_WARNING,"Unable to obtain the AOF file length. stat: %s",
1651             strerror(errno));
1652     } else {
1653         server.aof_current_size = sb.st_size;
1654     }
1655     latencyEndMonitor(latency);
1656     latencyAddSampleIfNeeded("aof-fstat",latency);
1657 }
1658 
1659 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
1660  * Handle this. */
backgroundRewriteDoneHandler(int exitcode,int bysignal)1661 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
1662     if (!bysignal && exitcode == 0) {
1663         int newfd, oldfd;
1664         char tmpfile[256];
1665         long long now = ustime();
1666         mstime_t latency;
1667 
1668         serverLog(LL_NOTICE,
1669             "Background AOF rewrite terminated with success");
1670 
1671         /* Flush the differences accumulated by the parent to the
1672          * rewritten AOF. */
1673         latencyStartMonitor(latency);
1674         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
1675             (int)server.aof_child_pid);
1676         newfd = open(tmpfile,O_WRONLY|O_APPEND);
1677         if (newfd == -1) {
1678             serverLog(LL_WARNING,
1679                 "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
1680             goto cleanup;
1681         }
1682 
1683         if (aofRewriteBufferWrite(newfd) == -1) {
1684             serverLog(LL_WARNING,
1685                 "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
1686             close(newfd);
1687             goto cleanup;
1688         }
1689         latencyEndMonitor(latency);
1690         latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
1691 
1692         serverLog(LL_NOTICE,
1693             "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
1694 
1695         /* The only remaining thing to do is to rename the temporary file to
1696          * the configured file and switch the file descriptor used to do AOF
1697          * writes. We don't want close(2) or rename(2) calls to block the
1698          * server on old file deletion.
1699          *
1700          * There are two possible scenarios:
1701          *
1702          * 1) AOF is DISABLED and this was a one time rewrite. The temporary
1703          * file will be renamed to the configured file. When this file already
1704          * exists, it will be unlinked, which may block the server.
1705          *
1706          * 2) AOF is ENABLED and the rewritten AOF will immediately start
1707          * receiving writes. After the temporary file is renamed to the
1708          * configured file, the original AOF file descriptor will be closed.
1709          * Since this will be the last reference to that file, closing it
1710          * causes the underlying file to be unlinked, which may block the
1711          * server.
1712          *
1713          * To mitigate the blocking effect of the unlink operation (either
1714          * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
1715          * use a background thread to take care of this. First, we
1716          * make scenario 1 identical to scenario 2 by opening the target file
1717          * when it exists. The unlink operation after the rename(2) will then
1718          * be executed upon calling close(2) for its descriptor. Everything to
1719          * guarantee atomicity for this switch has already happened by then, so
1720          * we don't care what the outcome or duration of that close operation
1721          * is, as long as the file descriptor is released again. */
1722         if (server.aof_fd == -1) {
1723             /* AOF disabled */
1724 
1725             /* Don't care if this fails: oldfd will be -1 and we handle that.
1726              * One notable case of -1 return is if the old file does
1727              * not exist. */
1728             oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
1729         } else {
1730             /* AOF enabled */
1731             oldfd = -1; /* We'll set this to the current AOF filedes later. */
1732         }
1733 
1734         /* Rename the temporary file. This will not unlink the target file if
1735          * it exists, because we reference it with "oldfd". */
1736         latencyStartMonitor(latency);
1737         if (rename(tmpfile,server.aof_filename) == -1) {
1738             serverLog(LL_WARNING,
1739                 "Error trying to rename the temporary AOF file %s into %s: %s",
1740                 tmpfile,
1741                 server.aof_filename,
1742                 strerror(errno));
1743             close(newfd);
1744             if (oldfd != -1) close(oldfd);
1745             goto cleanup;
1746         }
1747         latencyEndMonitor(latency);
1748         latencyAddSampleIfNeeded("aof-rename",latency);
1749 
1750         if (server.aof_fd == -1) {
1751             /* AOF disabled, we don't need to set the AOF file descriptor
1752              * to this new file, so we can close it. */
1753             close(newfd);
1754         } else {
1755             /* AOF enabled, replace the old fd with the new one. */
1756             oldfd = server.aof_fd;
1757             server.aof_fd = newfd;
1758             if (server.aof_fsync == AOF_FSYNC_ALWAYS)
1759                 redis_fsync(newfd);
1760             else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
1761                 aof_background_fsync(newfd);
1762             server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
1763             aofUpdateCurrentSize();
1764             server.aof_rewrite_base_size = server.aof_current_size;
1765             server.aof_current_size = server.aof_current_size;
1766 
1767             /* Clear regular AOF buffer since its contents was just written to
1768              * the new AOF from the background rewrite buffer. */
1769             sdsfree(server.aof_buf);
1770             server.aof_buf = sdsempty();
1771         }
1772 
1773         server.aof_lastbgrewrite_status = C_OK;
1774 
1775         serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
1776         /* Change state from WAIT_REWRITE to ON if needed */
1777         if (server.aof_state == AOF_WAIT_REWRITE)
1778             server.aof_state = AOF_ON;
1779 
1780         /* Asynchronously close the overwritten AOF. */
1781         if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
1782 
1783         serverLog(LL_VERBOSE,
1784             "Background AOF rewrite signal handler took %lldus", ustime()-now);
1785     } else if (!bysignal && exitcode != 0) {
1786         /* SIGUSR1 is whitelisted, so we have a way to kill a child without
1787          * tirggering an error condition. */
1788         if (bysignal != SIGUSR1)
1789             server.aof_lastbgrewrite_status = C_ERR;
1790         serverLog(LL_WARNING,
1791             "Background AOF rewrite terminated with error");
1792     } else {
1793         server.aof_lastbgrewrite_status = C_ERR;
1794 
1795         serverLog(LL_WARNING,
1796             "Background AOF rewrite terminated by signal %d", bysignal);
1797     }
1798 
1799 cleanup:
1800     aofClosePipes();
1801     aofRewriteBufferReset();
1802     aofRemoveTempFile(server.aof_child_pid);
1803     server.aof_child_pid = -1;
1804     server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
1805     server.aof_rewrite_time_start = -1;
1806     /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
1807     if (server.aof_state == AOF_WAIT_REWRITE)
1808         server.aof_rewrite_scheduled = 1;
1809 }
1810