xref: /redis-3.2.3/src/replication.c (revision b9bb7e2d)
1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "redis.h"
33 
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 
40 void replicationDiscardCachedMaster(void);
41 void replicationResurrectCachedMaster(int newfd);
42 void replicationSendAck(void);
43 
44 /* ---------------------------------- MASTER -------------------------------- */
45 
46 void createReplicationBacklog(void) {
47     redisAssert(server.repl_backlog == NULL);
48     server.repl_backlog = zmalloc(server.repl_backlog_size);
49     server.repl_backlog_histlen = 0;
50     server.repl_backlog_idx = 0;
51     /* When a new backlog buffer is created, we increment the replication
52      * offset by one to make sure we'll not be able to PSYNC with any
53      * previous slave. This is needed because we avoid incrementing the
54      * master_repl_offset if no backlog exists nor slaves are attached. */
55     server.master_repl_offset++;
56 
57     /* We don't have any data inside our buffer, but virtually the first
58      * byte we have is the next byte that will be generated for the
59      * replication stream. */
60     server.repl_backlog_off = server.master_repl_offset+1;
61 }
62 
63 /* This function is called when the user modifies the replication backlog
64  * size at runtime. It is up to the function to both update the
65  * server.repl_backlog_size and to resize the buffer and setup it so that
66  * it contains the same data as the previous one (possibly less data, but
67  * the most recent bytes, or the same data and more free space in case the
68  * buffer is enlarged). */
69 void resizeReplicationBacklog(long long newsize) {
70     if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
71         newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
72     if (server.repl_backlog_size == newsize) return;
73 
74     server.repl_backlog_size = newsize;
75     if (server.repl_backlog != NULL) {
76         /* What we actually do is to flush the old buffer and realloc a new
77          * empty one. It will refill with new data incrementally.
78          * The reason is that copying a few gigabytes adds latency and even
79          * worse often we need to alloc additional space before freeing the
80          * old buffer. */
81         zfree(server.repl_backlog);
82         server.repl_backlog = zmalloc(server.repl_backlog_size);
83         server.repl_backlog_histlen = 0;
84         server.repl_backlog_idx = 0;
85         /* Next byte we have is... the next since the buffer is emtpy. */
86         server.repl_backlog_off = server.master_repl_offset+1;
87     }
88 }
89 
90 void freeReplicationBacklog(void) {
91     redisAssert(listLength(server.slaves) == 0);
92     zfree(server.repl_backlog);
93     server.repl_backlog = NULL;
94 }
95 
96 /* Add data to the replication backlog.
97  * This function also increments the global replication offset stored at
98  * server.master_repl_offset, because there is no case where we want to feed
99  * the backlog without incrementing the buffer. */
100 void feedReplicationBacklog(void *ptr, size_t len) {
101     unsigned char *p = ptr;
102 
103     server.master_repl_offset += len;
104 
105     /* This is a circular buffer, so write as much data we can at every
106      * iteration and rewind the "idx" index if we reach the limit. */
107     while(len) {
108         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
109         if (thislen > len) thislen = len;
110         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
111         server.repl_backlog_idx += thislen;
112         if (server.repl_backlog_idx == server.repl_backlog_size)
113             server.repl_backlog_idx = 0;
114         len -= thislen;
115         p += thislen;
116         server.repl_backlog_histlen += thislen;
117     }
118     if (server.repl_backlog_histlen > server.repl_backlog_size)
119         server.repl_backlog_histlen = server.repl_backlog_size;
120     /* Set the offset of the first byte we have in the backlog. */
121     server.repl_backlog_off = server.master_repl_offset -
122                               server.repl_backlog_histlen + 1;
123 }
124 
125 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
126  * as input. */
127 void feedReplicationBacklogWithObject(robj *o) {
128     char llstr[REDIS_LONGSTR_SIZE];
129     void *p;
130     size_t len;
131 
132     if (o->encoding == REDIS_ENCODING_INT) {
133         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
134         p = llstr;
135     } else {
136         len = sdslen(o->ptr);
137         p = o->ptr;
138     }
139     feedReplicationBacklog(p,len);
140 }
141 
142 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
143     listNode *ln;
144     listIter li;
145     int j, len;
146     char llstr[REDIS_LONGSTR_SIZE];
147 
148     /* If there aren't slaves, and there is no backlog buffer to populate,
149      * we can return ASAP. */
150     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
151 
152     /* We can't have slaves attached and no backlog. */
153     redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
154 
155     /* Send SELECT command to every slave if needed. */
156     if (server.slaveseldb != dictid) {
157         robj *selectcmd;
158 
159         /* For a few DBs we have pre-computed SELECT command. */
160         if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
161             selectcmd = shared.select[dictid];
162         } else {
163             int dictid_len;
164 
165             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
166             selectcmd = createObject(REDIS_STRING,
167                 sdscatprintf(sdsempty(),
168                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
169                 dictid_len, llstr));
170         }
171 
172         /* Add the SELECT command into the backlog. */
173         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
174 
175         /* Send it to slaves. */
176         listRewind(slaves,&li);
177         while((ln = listNext(&li))) {
178             redisClient *slave = ln->value;
179             addReply(slave,selectcmd);
180         }
181 
182         if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
183             decrRefCount(selectcmd);
184     }
185     server.slaveseldb = dictid;
186 
187     /* Write the command to the replication backlog if any. */
188     if (server.repl_backlog) {
189         char aux[REDIS_LONGSTR_SIZE+3];
190 
191         /* Add the multi bulk reply length. */
192         aux[0] = '*';
193         len = ll2string(aux+1,sizeof(aux)-1,argc);
194         aux[len+1] = '\r';
195         aux[len+2] = '\n';
196         feedReplicationBacklog(aux,len+3);
197 
198         for (j = 0; j < argc; j++) {
199             long objlen = stringObjectLen(argv[j]);
200 
201             /* We need to feed the buffer with the object as a bulk reply
202              * not just as a plain string, so create the $..CRLF payload len
203              * ad add the final CRLF */
204             aux[0] = '$';
205             len = ll2string(aux+1,sizeof(aux)-1,objlen);
206             aux[len+1] = '\r';
207             aux[len+2] = '\n';
208             feedReplicationBacklog(aux,len+3);
209             feedReplicationBacklogWithObject(argv[j]);
210             feedReplicationBacklog(aux+len+1,2);
211         }
212     }
213 
214     /* Write the command to every slave. */
215     listRewind(slaves,&li);
216     while((ln = listNext(&li))) {
217         redisClient *slave = ln->value;
218 
219         /* Don't feed slaves that are still waiting for BGSAVE to start */
220         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
221 
222         /* Feed slaves that are waiting for the initial SYNC (so these commands
223          * are queued in the output buffer until the initial SYNC completes),
224          * or are already in sync with the master. */
225 
226         /* Add the multi bulk length. */
227         addReplyMultiBulkLen(slave,argc);
228 
229         /* Finally any additional argument that was not stored inside the
230          * static buffer if any (from j to argc). */
231         for (j = 0; j < argc; j++)
232             addReplyBulk(slave,argv[j]);
233     }
234 }
235 
236 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
237     listNode *ln;
238     listIter li;
239     int j;
240     sds cmdrepr = sdsnew("+");
241     robj *cmdobj;
242     struct timeval tv;
243 
244     gettimeofday(&tv,NULL);
245     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
246     if (c->flags & REDIS_LUA_CLIENT) {
247         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
248     } else if (c->flags & REDIS_UNIX_SOCKET) {
249         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
250     } else {
251         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
252     }
253 
254     for (j = 0; j < argc; j++) {
255         if (argv[j]->encoding == REDIS_ENCODING_INT) {
256             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
257         } else {
258             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
259                         sdslen(argv[j]->ptr));
260         }
261         if (j != argc-1)
262             cmdrepr = sdscatlen(cmdrepr," ",1);
263     }
264     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
265     cmdobj = createObject(REDIS_STRING,cmdrepr);
266 
267     listRewind(monitors,&li);
268     while((ln = listNext(&li))) {
269         redisClient *monitor = ln->value;
270         addReply(monitor,cmdobj);
271     }
272     decrRefCount(cmdobj);
273 }
274 
275 /* Feed the slave 'c' with the replication backlog starting from the
276  * specified 'offset' up to the end of the backlog. */
277 long long addReplyReplicationBacklog(redisClient *c, long long offset) {
278     long long j, skip, len;
279 
280     redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
281 
282     if (server.repl_backlog_histlen == 0) {
283         redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
284         return 0;
285     }
286 
287     redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
288              server.repl_backlog_size);
289     redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
290              server.repl_backlog_off);
291     redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
292              server.repl_backlog_histlen);
293     redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
294              server.repl_backlog_idx);
295 
296     /* Compute the amount of bytes we need to discard. */
297     skip = offset - server.repl_backlog_off;
298     redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
299 
300     /* Point j to the oldest byte, that is actaully our
301      * server.repl_backlog_off byte. */
302     j = (server.repl_backlog_idx +
303         (server.repl_backlog_size-server.repl_backlog_histlen)) %
304         server.repl_backlog_size;
305     redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
306 
307     /* Discard the amount of data to seek to the specified 'offset'. */
308     j = (j + skip) % server.repl_backlog_size;
309 
310     /* Feed slave with data. Since it is a circular buffer we have to
311      * split the reply in two parts if we are cross-boundary. */
312     len = server.repl_backlog_histlen - skip;
313     redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
314     while(len) {
315         long long thislen =
316             ((server.repl_backlog_size - j) < len) ?
317             (server.repl_backlog_size - j) : len;
318 
319         redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
320         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
321         len -= thislen;
322         j = 0;
323     }
324     return server.repl_backlog_histlen - skip;
325 }
326 
327 /* This function handles the PSYNC command from the point of view of a
328  * master receiving a request for partial resynchronization.
329  *
330  * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
331  * with the usual full resync. */
332 int masterTryPartialResynchronization(redisClient *c) {
333     long long psync_offset, psync_len;
334     char *master_runid = c->argv[1]->ptr;
335     char buf[128];
336     int buflen;
337 
338     /* Is the runid of this master the same advertised by the wannabe slave
339      * via PSYNC? If runid changed this master is a different instance and
340      * there is no way to continue. */
341     if (strcasecmp(master_runid, server.runid)) {
342         /* Run id "?" is used by slaves that want to force a full resync. */
343         if (master_runid[0] != '?') {
344             redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
345                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
346                 master_runid, server.runid);
347         } else {
348             redisLog(REDIS_NOTICE,"Full resync requested by slave.");
349         }
350         goto need_full_resync;
351     }
352 
353     /* We still have the data our slave is asking for? */
354     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
355        REDIS_OK) goto need_full_resync;
356     if (!server.repl_backlog ||
357         psync_offset < server.repl_backlog_off ||
358         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
359     {
360         redisLog(REDIS_NOTICE,
361             "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset);
362         if (psync_offset > server.master_repl_offset) {
363             redisLog(REDIS_WARNING,
364                 "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset.");
365         }
366         goto need_full_resync;
367     }
368 
369     /* If we reached this point, we are able to perform a partial resync:
370      * 1) Set client state to make it a slave.
371      * 2) Inform the client we can continue with +CONTINUE
372      * 3) Send the backlog data (from the offset to the end) to the slave. */
373     c->flags |= REDIS_SLAVE;
374     c->replstate = REDIS_REPL_ONLINE;
375     c->repl_ack_time = server.unixtime;
376     listAddNodeTail(server.slaves,c);
377     /* We can't use the connection buffers since they are used to accumulate
378      * new commands at this stage. But we are sure the socket send buffer is
379      * emtpy so this write will never fail actually. */
380     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
381     if (write(c->fd,buf,buflen) != buflen) {
382         freeClientAsync(c);
383         return REDIS_OK;
384     }
385     psync_len = addReplyReplicationBacklog(c,psync_offset);
386     redisLog(REDIS_NOTICE,
387         "Partial resynchronization request accepted. Sending %lld bytes of backlog starting from offset %lld.", psync_len, psync_offset);
388     /* Note that we don't need to set the selected DB at server.slaveseldb
389      * to -1 to force the master to emit SELECT, since the slave already
390      * has this state from the previous connection with the master. */
391 
392     refreshGoodSlavesCount();
393     return REDIS_OK; /* The caller can return, no full resync needed. */
394 
395 need_full_resync:
396     /* We need a full resync for some reason... notify the client. */
397     psync_offset = server.master_repl_offset;
398     /* Add 1 to psync_offset if it the replication backlog does not exists
399      * as when it will be created later we'll increment the offset by one. */
400     if (server.repl_backlog == NULL) psync_offset++;
401     /* Again, we can't use the connection buffers (see above). */
402     buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
403                       server.runid,psync_offset);
404     if (write(c->fd,buf,buflen) != buflen) {
405         freeClientAsync(c);
406         return REDIS_OK;
407     }
408     return REDIS_ERR;
409 }
410 
411 /* SYNC ad PSYNC command implemenation. */
412 void syncCommand(redisClient *c) {
413     /* ignore SYNC if already slave or in monitor mode */
414     if (c->flags & REDIS_SLAVE) return;
415 
416     /* Refuse SYNC requests if we are a slave but the link with our master
417      * is not ok... */
418     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
419         addReplyError(c,"Can't SYNC while not connected with my master");
420         return;
421     }
422 
423     /* SYNC can't be issued when the server has pending data to send to
424      * the client about already issued commands. We need a fresh reply
425      * buffer registering the differences between the BGSAVE and the current
426      * dataset, so that we can copy to other slaves if needed. */
427     if (listLength(c->reply) != 0 || c->bufpos != 0) {
428         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
429         return;
430     }
431 
432     redisLog(REDIS_NOTICE,"Slave asks for synchronization");
433 
434     /* Try a partial resynchronization if this is a PSYNC command.
435      * If it fails, we continue with usual full resynchronization, however
436      * when this happens masterTryPartialResynchronization() already
437      * replied with:
438      *
439      * +FULLRESYNC <runid> <offset>
440      *
441      * So the slave knows the new runid and offset to try a PSYNC later
442      * if the connection with the master is lost. */
443     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
444         if (masterTryPartialResynchronization(c) == REDIS_OK) {
445             server.stat_sync_partial_ok++;
446             return; /* No full resync needed, return. */
447         } else {
448             char *master_runid = c->argv[1]->ptr;
449 
450             /* Increment stats for failed PSYNCs, but only if the
451              * runid is not "?", as this is used by slaves to force a full
452              * resync on purpose when they are not albe to partially
453              * resync. */
454             if (master_runid[0] != '?') server.stat_sync_partial_err++;
455         }
456     } else {
457         /* If a slave uses SYNC, we are dealing with an old implementation
458          * of the replication protocol (like redis-cli --slave). Flag the client
459          * so that we don't expect to receive REPLCONF ACK feedbacks. */
460         c->flags |= REDIS_PRE_PSYNC;
461     }
462 
463     /* Full resynchronization. */
464     server.stat_sync_full++;
465 
466     /* Here we need to check if there is a background saving operation
467      * in progress, or if it is required to start one */
468     if (server.rdb_child_pid != -1) {
469         /* Ok a background save is in progress. Let's check if it is a good
470          * one for replication, i.e. if there is another slave that is
471          * registering differences since the server forked to save */
472         redisClient *slave;
473         listNode *ln;
474         listIter li;
475 
476         listRewind(server.slaves,&li);
477         while((ln = listNext(&li))) {
478             slave = ln->value;
479             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
480         }
481         if (ln) {
482             /* Perfect, the server is already registering differences for
483              * another slave. Set the right state, and copy the buffer. */
484             copyClientOutputBuffer(c,slave);
485             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
486             redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
487         } else {
488             /* No way, we need to wait for the next BGSAVE in order to
489              * register differences */
490             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
491             redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
492         }
493     } else {
494         /* Ok we don't have a BGSAVE in progress, let's start one */
495         redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
496         if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
497             redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
498             addReplyError(c,"Unable to perform background save");
499             return;
500         }
501         c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
502         /* Flush the script cache for the new slave. */
503         replicationScriptCacheFlush();
504     }
505 
506     if (server.repl_disable_tcp_nodelay)
507         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
508     c->repldbfd = -1;
509     c->flags |= REDIS_SLAVE;
510     server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
511     listAddNodeTail(server.slaves,c);
512     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
513         createReplicationBacklog();
514     return;
515 }
516 
517 /* REPLCONF <option> <value> <option> <value> ...
518  * This command is used by a slave in order to configure the replication
519  * process before starting it with the SYNC command.
520  *
521  * Currently the only use of this command is to communicate to the master
522  * what is the listening port of the Slave redis instance, so that the
523  * master can accurately list slaves and their listening ports in
524  * the INFO output.
525  *
526  * In the future the same command can be used in order to configure
527  * the replication to initiate an incremental replication instead of a
528  * full resync. */
529 void replconfCommand(redisClient *c) {
530     int j;
531 
532     if ((c->argc % 2) == 0) {
533         /* Number of arguments must be odd to make sure that every
534          * option has a corresponding value. */
535         addReply(c,shared.syntaxerr);
536         return;
537     }
538 
539     /* Process every option-value pair. */
540     for (j = 1; j < c->argc; j+=2) {
541         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
542             long port;
543 
544             if ((getLongFromObjectOrReply(c,c->argv[j+1],
545                     &port,NULL) != REDIS_OK))
546                 return;
547             c->slave_listening_port = port;
548         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
549             /* REPLCONF ACK is used by slave to inform the master the amount
550              * of replication stream that it processed so far. It is an
551              * internal only command that normal clients should never use. */
552             long long offset;
553 
554             if (!(c->flags & REDIS_SLAVE)) return;
555             if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
556                 return;
557             if (offset > c->repl_ack_off)
558                 c->repl_ack_off = offset;
559             c->repl_ack_time = server.unixtime;
560             /* Note: this command does not reply anything! */
561             return;
562         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
563             /* REPLCONF GETACK is used in order to request an ACK ASAP
564              * to the slave. */
565             if (server.masterhost && server.master) replicationSendAck();
566             /* Note: this command does not reply anything! */
567         } else {
568             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
569                 (char*)c->argv[j]->ptr);
570             return;
571         }
572     }
573     addReply(c,shared.ok);
574 }
575 
576 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
577     redisClient *slave = privdata;
578     REDIS_NOTUSED(el);
579     REDIS_NOTUSED(mask);
580     char buf[REDIS_IOBUF_LEN];
581     ssize_t nwritten, buflen;
582 
583     /* Before sending the RDB file, we send the preamble as configured by the
584      * replication process. Currently the preamble is just the bulk count of
585      * the file in the form "$<length>\r\n". */
586     if (slave->replpreamble) {
587         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
588         if (nwritten == -1) {
589             redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
590                 strerror(errno));
591             freeClient(slave);
592             return;
593         }
594         sdsrange(slave->replpreamble,nwritten,-1);
595         if (sdslen(slave->replpreamble) == 0) {
596             sdsfree(slave->replpreamble);
597             slave->replpreamble = NULL;
598             /* fall through sending data. */
599         } else {
600             return;
601         }
602     }
603 
604     /* If the preamble was already transfered, send the RDB bulk data. */
605     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
606     buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
607     if (buflen <= 0) {
608         redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
609             (buflen == 0) ? "premature EOF" : strerror(errno));
610         freeClient(slave);
611         return;
612     }
613     if ((nwritten = write(fd,buf,buflen)) == -1) {
614         if (errno != EAGAIN) {
615             redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
616                 strerror(errno));
617             freeClient(slave);
618         }
619         return;
620     }
621     slave->repldboff += nwritten;
622     if (slave->repldboff == slave->repldbsize) {
623         close(slave->repldbfd);
624         slave->repldbfd = -1;
625         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
626         slave->replstate = REDIS_REPL_ONLINE;
627         slave->repl_ack_time = server.unixtime;
628         if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
629             sendReplyToClient, slave) == AE_ERR) {
630             redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
631             freeClient(slave);
632             return;
633         }
634         refreshGoodSlavesCount();
635         redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
636     }
637 }
638 
639 /* This function is called at the end of every background saving.
640  * The argument bgsaveerr is REDIS_OK if the background saving succeeded
641  * otherwise REDIS_ERR is passed to the function.
642  *
643  * The goal of this function is to handle slaves waiting for a successful
644  * background saving in order to perform non-blocking synchronization. */
645 void updateSlavesWaitingBgsave(int bgsaveerr) {
646     listNode *ln;
647     int startbgsave = 0;
648     listIter li;
649 
650     listRewind(server.slaves,&li);
651     while((ln = listNext(&li))) {
652         redisClient *slave = ln->value;
653 
654         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
655             startbgsave = 1;
656             slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
657         } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
658             struct redis_stat buf;
659 
660             if (bgsaveerr != REDIS_OK) {
661                 freeClient(slave);
662                 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
663                 continue;
664             }
665             if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
666                 redis_fstat(slave->repldbfd,&buf) == -1) {
667                 freeClient(slave);
668                 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
669                 continue;
670             }
671             slave->repldboff = 0;
672             slave->repldbsize = buf.st_size;
673             slave->replstate = REDIS_REPL_SEND_BULK;
674             slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
675                 (unsigned long long) slave->repldbsize);
676 
677             aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
678             if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
679                 freeClient(slave);
680                 continue;
681             }
682         }
683     }
684     if (startbgsave) {
685         /* Since we are starting a new background save for one or more slaves,
686          * we flush the Replication Script Cache to use EVAL to propagate every
687          * new EVALSHA for the first time, since all the new slaves don't know
688          * about previous scripts. */
689         replicationScriptCacheFlush();
690         if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
691             listIter li;
692 
693             listRewind(server.slaves,&li);
694             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
695             while((ln = listNext(&li))) {
696                 redisClient *slave = ln->value;
697 
698                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
699                     freeClient(slave);
700             }
701         }
702     }
703 }
704 
705 /* ----------------------------------- SLAVE -------------------------------- */
706 
707 /* Abort the async download of the bulk dataset while SYNC-ing with master */
708 void replicationAbortSyncTransfer(void) {
709     redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
710 
711     aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
712     close(server.repl_transfer_s);
713     close(server.repl_transfer_fd);
714     unlink(server.repl_transfer_tmpfile);
715     zfree(server.repl_transfer_tmpfile);
716     server.repl_state = REDIS_REPL_CONNECT;
717 }
718 
719 /* Avoid the master to detect the slave is timing out while loading the
720  * RDB file in initial synchronization. We send a single newline character
721  * that is valid protocol but is guaranteed to either be sent entierly or
722  * not, since the byte is indivisible.
723  *
724  * The function is called in two contexts: while we flush the current
725  * data with emptyDb(), and while we load the new data received as an
726  * RDB file from the master. */
727 void replicationSendNewlineToMaster(void) {
728     static time_t newline_sent;
729     if (time(NULL) != newline_sent) {
730         newline_sent = time(NULL);
731         if (write(server.repl_transfer_s,"\n",1) == -1) {
732             /* Pinging back in this stage is best-effort. */
733         }
734     }
735 }
736 
737 /* Callback used by emptyDb() while flushing away old data to load
738  * the new dataset received by the master. */
739 void replicationEmptyDbCallback(void *privdata) {
740     REDIS_NOTUSED(privdata);
741     replicationSendNewlineToMaster();
742 }
743 
744 /* Asynchronously read the SYNC payload we receive from a master */
745 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
746 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
747     char buf[4096];
748     ssize_t nread, readlen;
749     off_t left;
750     REDIS_NOTUSED(el);
751     REDIS_NOTUSED(privdata);
752     REDIS_NOTUSED(mask);
753 
754     /* If repl_transfer_size == -1 we still have to read the bulk length
755      * from the master reply. */
756     if (server.repl_transfer_size == -1) {
757         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
758             redisLog(REDIS_WARNING,
759                 "I/O error reading bulk count from MASTER: %s",
760                 strerror(errno));
761             goto error;
762         }
763 
764         if (buf[0] == '-') {
765             redisLog(REDIS_WARNING,
766                 "MASTER aborted replication with an error: %s",
767                 buf+1);
768             goto error;
769         } else if (buf[0] == '\0') {
770             /* At this stage just a newline works as a PING in order to take
771              * the connection live. So we refresh our last interaction
772              * timestamp. */
773             server.repl_transfer_lastio = server.unixtime;
774             return;
775         } else if (buf[0] != '$') {
776             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
777             goto error;
778         }
779         server.repl_transfer_size = strtol(buf+1,NULL,10);
780         redisLog(REDIS_NOTICE,
781             "MASTER <-> SLAVE sync: receiving %lld bytes from master",
782             (long long) server.repl_transfer_size);
783         return;
784     }
785 
786     /* Read bulk data */
787     left = server.repl_transfer_size - server.repl_transfer_read;
788     readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
789     nread = read(fd,buf,readlen);
790     if (nread <= 0) {
791         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
792             (nread == -1) ? strerror(errno) : "connection lost");
793         replicationAbortSyncTransfer();
794         return;
795     }
796     server.repl_transfer_lastio = server.unixtime;
797     if (write(server.repl_transfer_fd,buf,nread) != nread) {
798         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
799         goto error;
800     }
801     server.repl_transfer_read += nread;
802 
803     /* Sync data on disk from time to time, otherwise at the end of the transfer
804      * we may suffer a big delay as the memory buffers are copied into the
805      * actual disk. */
806     if (server.repl_transfer_read >=
807         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
808     {
809         off_t sync_size = server.repl_transfer_read -
810                           server.repl_transfer_last_fsync_off;
811         rdb_fsync_range(server.repl_transfer_fd,
812             server.repl_transfer_last_fsync_off, sync_size);
813         server.repl_transfer_last_fsync_off += sync_size;
814     }
815 
816     /* Check if the transfer is now complete */
817     if (server.repl_transfer_read == server.repl_transfer_size) {
818         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
819             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
820             replicationAbortSyncTransfer();
821             return;
822         }
823         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
824         signalFlushedDb(-1);
825         emptyDb(replicationEmptyDbCallback);
826         /* Before loading the DB into memory we need to delete the readable
827          * handler, otherwise it will get called recursively since
828          * rdbLoad() will call the event loop to process events from time to
829          * time for non blocking loading. */
830         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
831         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
832         if (rdbLoad(server.rdb_filename) != REDIS_OK) {
833             redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
834             replicationAbortSyncTransfer();
835             return;
836         }
837         /* Final setup of the connected slave <- master link */
838         zfree(server.repl_transfer_tmpfile);
839         close(server.repl_transfer_fd);
840         server.master = createClient(server.repl_transfer_s);
841         server.master->flags |= REDIS_MASTER;
842         server.master->authenticated = 1;
843         server.repl_state = REDIS_REPL_CONNECTED;
844         server.master->reploff = server.repl_master_initial_offset;
845         memcpy(server.master->replrunid, server.repl_master_runid,
846             sizeof(server.repl_master_runid));
847         /* If master offset is set to -1, this master is old and is not
848          * PSYNC capable, so we flag it accordingly. */
849         if (server.master->reploff == -1)
850             server.master->flags |= REDIS_PRE_PSYNC;
851         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
852         /* Restart the AOF subsystem now that we finished the sync. This
853          * will trigger an AOF rewrite, and when done will start appending
854          * to the new file. */
855         if (server.aof_state != REDIS_AOF_OFF) {
856             int retry = 10;
857 
858             stopAppendOnly();
859             while (retry-- && startAppendOnly() == REDIS_ERR) {
860                 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
861                 sleep(1);
862             }
863             if (!retry) {
864                 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
865                 exit(1);
866             }
867         }
868     }
869 
870     return;
871 
872 error:
873     replicationAbortSyncTransfer();
874     return;
875 }
876 
877 /* Send a synchronous command to the master. Used to send AUTH and
878  * REPLCONF commands before starting the replication with SYNC.
879  *
880  * The command returns an sds string representing the result of the
881  * operation. On error the first byte is a "-".
882  */
883 char *sendSynchronousCommand(int fd, ...) {
884     va_list ap;
885     sds cmd = sdsempty();
886     char *arg, buf[256];
887 
888     /* Create the command to send to the master, we use simple inline
889      * protocol for simplicity as currently we only send simple strings. */
890     va_start(ap,fd);
891     while(1) {
892         arg = va_arg(ap, char*);
893         if (arg == NULL) break;
894 
895         if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
896         cmd = sdscat(cmd,arg);
897     }
898     cmd = sdscatlen(cmd,"\r\n",2);
899 
900     /* Transfer command to the server. */
901     if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
902         sdsfree(cmd);
903         return sdscatprintf(sdsempty(),"-Writing to master: %s",
904                 strerror(errno));
905     }
906     sdsfree(cmd);
907 
908     /* Read the reply from the server. */
909     if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
910     {
911         return sdscatprintf(sdsempty(),"-Reading from master: %s",
912                 strerror(errno));
913     }
914     return sdsnew(buf);
915 }
916 
917 /* Try a partial resynchronization with the master if we are about to reconnect.
918  * If there is no cached master structure, at least try to issue a
919  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
920  * command in order to obtain the master run id and the master replication
921  * global offset.
922  *
923  * This function is designed to be called from syncWithMaster(), so the
924  * following assumptions are made:
925  *
926  * 1) We pass the function an already connected socket "fd".
927  * 2) This function does not close the file descriptor "fd". However in case
928  *    of successful partial resynchronization, the function will reuse
929  *    'fd' as file descriptor of the server.master client structure.
930  *
931  * The function returns:
932  *
933  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
934  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
935  *                   In this case the master run_id and global replication
936  *                   offset is saved.
937  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
938  *                      the caller should fall back to SYNC.
939  */
940 
941 #define PSYNC_CONTINUE 0
942 #define PSYNC_FULLRESYNC 1
943 #define PSYNC_NOT_SUPPORTED 2
944 int slaveTryPartialResynchronization(int fd) {
945     char *psync_runid;
946     char psync_offset[32];
947     sds reply;
948 
949     /* Initially set repl_master_initial_offset to -1 to mark the current
950      * master run_id and offset as not valid. Later if we'll be able to do
951      * a FULL resync using the PSYNC command we'll set the offset at the
952      * right value, so that this information will be propagated to the
953      * client structure representing the master into server.master. */
954     server.repl_master_initial_offset = -1;
955 
956     if (server.cached_master) {
957         psync_runid = server.cached_master->replrunid;
958         snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
959         redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
960     } else {
961         redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
962         psync_runid = "?";
963         memcpy(psync_offset,"-1",3);
964     }
965 
966     /* Issue the PSYNC command */
967     reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
968 
969     if (!strncmp(reply,"+FULLRESYNC",11)) {
970         char *runid = NULL, *offset = NULL;
971 
972         /* FULL RESYNC, parse the reply in order to extract the run id
973          * and the replication offset. */
974         runid = strchr(reply,' ');
975         if (runid) {
976             runid++;
977             offset = strchr(runid,' ');
978             if (offset) offset++;
979         }
980         if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
981             redisLog(REDIS_WARNING,
982                 "Master replied with wrong +FULLRESYNC syntax.");
983             /* This is an unexpected condition, actually the +FULLRESYNC
984              * reply means that the master supports PSYNC, but the reply
985              * format seems wrong. To stay safe we blank the master
986              * runid to make sure next PSYNCs will fail. */
987             memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
988         } else {
989             memcpy(server.repl_master_runid, runid, offset-runid-1);
990             server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
991             server.repl_master_initial_offset = strtoll(offset,NULL,10);
992             redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
993                 server.repl_master_runid,
994                 server.repl_master_initial_offset);
995         }
996         /* We are going to full resync, discard the cached master structure. */
997         replicationDiscardCachedMaster();
998         sdsfree(reply);
999         return PSYNC_FULLRESYNC;
1000     }
1001 
1002     if (!strncmp(reply,"+CONTINUE",9)) {
1003         /* Partial resync was accepted, set the replication state accordingly */
1004         redisLog(REDIS_NOTICE,
1005             "Successful partial resynchronization with master.");
1006         sdsfree(reply);
1007         replicationResurrectCachedMaster(fd);
1008         return PSYNC_CONTINUE;
1009     }
1010 
1011     /* If we reach this point we receied either an error since the master does
1012      * not understand PSYNC, or an unexpected reply from the master.
1013      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
1014 
1015     if (strncmp(reply,"-ERR",4)) {
1016         /* If it's not an error, log the unexpected event. */
1017         redisLog(REDIS_WARNING,
1018             "Unexpected reply to PSYNC from master: %s", reply);
1019     } else {
1020         redisLog(REDIS_NOTICE,
1021             "Master does not support PSYNC or is in "
1022             "error state (reply: %s)", reply);
1023     }
1024     sdsfree(reply);
1025     replicationDiscardCachedMaster();
1026     return PSYNC_NOT_SUPPORTED;
1027 }
1028 
1029 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
1030     char tmpfile[256], *err;
1031     int dfd, maxtries = 5;
1032     int sockerr = 0, psync_result;
1033     socklen_t errlen = sizeof(sockerr);
1034     REDIS_NOTUSED(el);
1035     REDIS_NOTUSED(privdata);
1036     REDIS_NOTUSED(mask);
1037 
1038     /* If this event fired after the user turned the instance into a master
1039      * with SLAVEOF NO ONE we must just return ASAP. */
1040     if (server.repl_state == REDIS_REPL_NONE) {
1041         close(fd);
1042         return;
1043     }
1044 
1045     /* Check for errors in the socket. */
1046     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1047         sockerr = errno;
1048     if (sockerr) {
1049         aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1050         redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
1051             strerror(sockerr));
1052         goto error;
1053     }
1054 
1055     /* If we were connecting, it's time to send a non blocking PING, we want to
1056      * make sure the master is able to reply before going into the actual
1057      * replication process where we have long timeouts in the order of
1058      * seconds (in the meantime the slave would block). */
1059     if (server.repl_state == REDIS_REPL_CONNECTING) {
1060         redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
1061         /* Delete the writable event so that the readable event remains
1062          * registered and we can wait for the PONG reply. */
1063         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
1064         server.repl_state = REDIS_REPL_RECEIVE_PONG;
1065         /* Send the PING, don't check for errors at all, we have the timeout
1066          * that will take care about this. */
1067         syncWrite(fd,"PING\r\n",6,100);
1068         return;
1069     }
1070 
1071     /* Receive the PONG command. */
1072     if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
1073         char buf[1024];
1074 
1075         /* Delete the readable event, we no longer need it now that there is
1076          * the PING reply to read. */
1077         aeDeleteFileEvent(server.el,fd,AE_READABLE);
1078 
1079         /* Read the reply with explicit timeout. */
1080         buf[0] = '\0';
1081         if (syncReadLine(fd,buf,sizeof(buf),
1082             server.repl_syncio_timeout*1000) == -1)
1083         {
1084             redisLog(REDIS_WARNING,
1085                 "I/O error reading PING reply from master: %s",
1086                 strerror(errno));
1087             goto error;
1088         }
1089 
1090         /* We accept only two replies as valid, a positive +PONG reply
1091          * (we just check for "+") or an authentication error.
1092          * Note that older versions of Redis replied with "operation not
1093          * permitted" instead of using a proper error code, so we test
1094          * both. */
1095         if (buf[0] != '+' &&
1096             strncmp(buf,"-NOAUTH",7) != 0 &&
1097             strncmp(buf,"-ERR operation not permitted",28) != 0)
1098         {
1099             redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
1100             goto error;
1101         } else {
1102             redisLog(REDIS_NOTICE,
1103                 "Master replied to PING, replication can continue...");
1104         }
1105     }
1106 
1107     /* AUTH with the master if required. */
1108     if(server.masterauth) {
1109         err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
1110         if (err[0] == '-') {
1111             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
1112             sdsfree(err);
1113             goto error;
1114         }
1115         sdsfree(err);
1116     }
1117 
1118     /* Set the slave port, so that Master's INFO command can list the
1119      * slave listening port correctly. */
1120     {
1121         sds port = sdsfromlonglong(server.port);
1122         err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
1123                                          NULL);
1124         sdsfree(port);
1125         /* Ignore the error if any, not all the Redis versions support
1126          * REPLCONF listening-port. */
1127         if (err[0] == '-') {
1128             redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
1129         }
1130         sdsfree(err);
1131     }
1132 
1133     /* Try a partial resynchonization. If we don't have a cached master
1134      * slaveTryPartialResynchronization() will at least try to use PSYNC
1135      * to start a full resynchronization so that we get the master run id
1136      * and the global offset, to try a partial resync at the next
1137      * reconnection attempt. */
1138     psync_result = slaveTryPartialResynchronization(fd);
1139     if (psync_result == PSYNC_CONTINUE) {
1140         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
1141         return;
1142     }
1143 
1144     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
1145      * and the server.repl_master_runid and repl_master_initial_offset are
1146      * already populated. */
1147     if (psync_result == PSYNC_NOT_SUPPORTED) {
1148         redisLog(REDIS_NOTICE,"Retrying with SYNC...");
1149         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
1150             redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
1151                 strerror(errno));
1152             goto error;
1153         }
1154     }
1155 
1156     /* Prepare a suitable temp file for bulk transfer */
1157     while(maxtries--) {
1158         snprintf(tmpfile,256,
1159             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1160         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1161         if (dfd != -1) break;
1162         sleep(1);
1163     }
1164     if (dfd == -1) {
1165         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1166         goto error;
1167     }
1168 
1169     /* Setup the non blocking download of the bulk file. */
1170     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
1171             == AE_ERR)
1172     {
1173         redisLog(REDIS_WARNING,
1174             "Can't create readable event for SYNC: %s (fd=%d)",
1175             strerror(errno),fd);
1176         goto error;
1177     }
1178 
1179     server.repl_state = REDIS_REPL_TRANSFER;
1180     server.repl_transfer_size = -1;
1181     server.repl_transfer_read = 0;
1182     server.repl_transfer_last_fsync_off = 0;
1183     server.repl_transfer_fd = dfd;
1184     server.repl_transfer_lastio = server.unixtime;
1185     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1186     return;
1187 
1188 error:
1189     close(fd);
1190     server.repl_transfer_s = -1;
1191     server.repl_state = REDIS_REPL_CONNECT;
1192     return;
1193 }
1194 
1195 int connectWithMaster(void) {
1196     int fd;
1197 
1198     fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
1199     if (fd == -1) {
1200         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
1201             strerror(errno));
1202         return REDIS_ERR;
1203     }
1204 
1205     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1206             AE_ERR)
1207     {
1208         close(fd);
1209         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
1210         return REDIS_ERR;
1211     }
1212 
1213     server.repl_transfer_lastio = server.unixtime;
1214     server.repl_transfer_s = fd;
1215     server.repl_state = REDIS_REPL_CONNECTING;
1216     return REDIS_OK;
1217 }
1218 
1219 /* This function can be called when a non blocking connection is currently
1220  * in progress to undo it. */
1221 void undoConnectWithMaster(void) {
1222     int fd = server.repl_transfer_s;
1223 
1224     redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
1225                 server.repl_state == REDIS_REPL_RECEIVE_PONG);
1226     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1227     close(fd);
1228     server.repl_transfer_s = -1;
1229     server.repl_state = REDIS_REPL_CONNECT;
1230 }
1231 
1232 /* This function aborts a non blocking replication attempt if there is one
1233  * in progress, by canceling the non-blocking connect attempt or
1234  * the initial bulk transfer.
1235  *
1236  * If there was a replication handshake in progress 1 is returned and
1237  * the replication state (server.repl_state) set to REDIS_REPL_CONNECT.
1238  *
1239  * Otherwise zero is returned and no operation is perforemd at all. */
1240 int cancelReplicationHandshake(void) {
1241     if (server.repl_state == REDIS_REPL_TRANSFER) {
1242         replicationAbortSyncTransfer();
1243     } else if (server.repl_state == REDIS_REPL_CONNECTING ||
1244              server.repl_state == REDIS_REPL_RECEIVE_PONG)
1245     {
1246         undoConnectWithMaster();
1247     } else {
1248         return 0;
1249     }
1250     return 1;
1251 }
1252 
1253 /* Set replication to the specified master address and port. */
1254 void replicationSetMaster(char *ip, int port) {
1255     sdsfree(server.masterhost);
1256     server.masterhost = sdsnew(ip);
1257     server.masterport = port;
1258     if (server.master) freeClient(server.master);
1259     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1260     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
1261     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1262     cancelReplicationHandshake();
1263     server.repl_state = REDIS_REPL_CONNECT;
1264     server.master_repl_offset = 0;
1265     server.repl_down_since = 0;
1266 }
1267 
1268 /* Cancel replication, setting the instance as a master itself. */
1269 void replicationUnsetMaster(void) {
1270     if (server.masterhost == NULL) return; /* Nothing to do. */
1271     sdsfree(server.masterhost);
1272     server.masterhost = NULL;
1273     if (server.master) {
1274         if (listLength(server.slaves) == 0) {
1275             /* If this instance is turned into a master and there are no
1276              * slaves, it inherits the replication offset from the master.
1277              * Under certain conditions this makes replicas comparable by
1278              * replication offset to understand what is the most updated. */
1279             server.master_repl_offset = server.master->reploff;
1280             freeReplicationBacklog();
1281         }
1282         freeClient(server.master);
1283     }
1284     replicationDiscardCachedMaster();
1285     cancelReplicationHandshake();
1286     server.repl_state = REDIS_REPL_NONE;
1287 }
1288 
1289 void slaveofCommand(redisClient *c) {
1290     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1291      * configured using the current address of the master node. */
1292     if (server.cluster_enabled) {
1293         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1294         return;
1295     }
1296 
1297     /* The special host/port combination "NO" "ONE" turns the instance
1298      * into a master. Otherwise the new master address is set. */
1299     if (!strcasecmp(c->argv[1]->ptr,"no") &&
1300         !strcasecmp(c->argv[2]->ptr,"one")) {
1301         if (server.masterhost) {
1302             replicationUnsetMaster();
1303             redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
1304         }
1305     } else {
1306         long port;
1307 
1308         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
1309             return;
1310 
1311         /* Check if we are already attached to the specified slave */
1312         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1313             && server.masterport == port) {
1314             redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1315             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1316             return;
1317         }
1318         /* There was no previous master or the user specified a different one,
1319          * we can continue. */
1320         replicationSetMaster(c->argv[1]->ptr, port);
1321         redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
1322             server.masterhost, server.masterport);
1323     }
1324     addReply(c,shared.ok);
1325 }
1326 
1327 /* ROLE command: provide information about the role of the instance
1328  * (master or slave) and additional information related to replication
1329  * in an easy to process format. */
1330 void roleCommand(redisClient *c) {
1331     if (server.masterhost == NULL) {
1332         listIter li;
1333         listNode *ln;
1334         void *mbcount;
1335         int slaves = 0;
1336 
1337         addReplyMultiBulkLen(c,3);
1338         addReplyBulkCBuffer(c,"master",6);
1339         addReplyLongLong(c,server.master_repl_offset);
1340         mbcount = addDeferredMultiBulkLength(c);
1341         listRewind(server.slaves,&li);
1342         while((ln = listNext(&li))) {
1343             redisClient *slave = ln->value;
1344             char ip[REDIS_IP_STR_LEN];
1345 
1346             if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue;
1347             if (slave->replstate != REDIS_REPL_ONLINE) continue;
1348             addReplyMultiBulkLen(c,3);
1349             addReplyBulkCString(c,ip);
1350             addReplyBulkLongLong(c,slave->slave_listening_port);
1351             addReplyBulkLongLong(c,slave->repl_ack_off);
1352             slaves++;
1353         }
1354         setDeferredMultiBulkLength(c,mbcount,slaves);
1355     } else {
1356         char *slavestate = NULL;
1357 
1358         addReplyMultiBulkLen(c,5);
1359         addReplyBulkCBuffer(c,"slave",5);
1360         addReplyBulkCString(c,server.masterhost);
1361         addReplyLongLong(c,server.masterport);
1362         switch(server.repl_state) {
1363         case REDIS_REPL_NONE: slavestate = "none"; break;
1364         case REDIS_REPL_CONNECT: slavestate = "connect"; break;
1365         case REDIS_REPL_CONNECTING: slavestate = "connecting"; break;
1366         case REDIS_REPL_RECEIVE_PONG: /* see next */
1367         case REDIS_REPL_TRANSFER: slavestate = "sync"; break;
1368         case REDIS_REPL_CONNECTED: slavestate = "connected"; break;
1369         default: slavestate = "unknown"; break;
1370         }
1371         addReplyBulkCString(c,slavestate);
1372         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1373     }
1374 }
1375 
1376 /* Send a REPLCONF ACK command to the master to inform it about the current
1377  * processed offset. If we are not connected with a master, the command has
1378  * no effects. */
1379 void replicationSendAck(void) {
1380     redisClient *c = server.master;
1381 
1382     if (c != NULL) {
1383         c->flags |= REDIS_MASTER_FORCE_REPLY;
1384         addReplyMultiBulkLen(c,3);
1385         addReplyBulkCString(c,"REPLCONF");
1386         addReplyBulkCString(c,"ACK");
1387         addReplyBulkLongLong(c,c->reploff);
1388         c->flags &= ~REDIS_MASTER_FORCE_REPLY;
1389     }
1390 }
1391 
1392 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
1393 
1394 /* In order to implement partial synchronization we need to be able to cache
1395  * our master's client structure after a transient disconnection.
1396  * It is cached into server.cached_master and flushed away using the following
1397  * functions. */
1398 
1399 /* This function is called by freeClient() in order to cache the master
1400  * client structure instead of destryoing it. freeClient() will return
1401  * ASAP after this function returns, so every action needed to avoid problems
1402  * with a client that is really "suspended" has to be done by this function.
1403  *
1404  * The other functions that will deal with the cached master are:
1405  *
1406  * replicationDiscardCachedMaster() that will make sure to kill the client
1407  * as for some reason we don't want to use it in the future.
1408  *
1409  * replicationResurrectCachedMaster() that is used after a successful PSYNC
1410  * handshake in order to reactivate the cached master.
1411  */
1412 void replicationCacheMaster(redisClient *c) {
1413     listNode *ln;
1414 
1415     redisAssert(server.master != NULL && server.cached_master == NULL);
1416     redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
1417 
1418     /* Remove from the list of clients, we don't want this client to be
1419      * listed by CLIENT LIST or processed in any way by batch operations. */
1420     ln = listSearchKey(server.clients,c);
1421     redisAssert(ln != NULL);
1422     listDelNode(server.clients,ln);
1423 
1424     /* Save the master. Server.master will be set to null later by
1425      * replicationHandleMasterDisconnection(). */
1426     server.cached_master = server.master;
1427 
1428     /* Remove the event handlers and close the socket. We'll later reuse
1429      * the socket of the new connection with the master during PSYNC. */
1430     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1431     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1432     close(c->fd);
1433 
1434     /* Set fd to -1 so that we can safely call freeClient(c) later. */
1435     c->fd = -1;
1436 
1437     /* Invalidate the Peer ID cache. */
1438     if (c->peerid) {
1439         sdsfree(c->peerid);
1440         c->peerid = NULL;
1441     }
1442 
1443     /* Caching the master happens instead of the actual freeClient() call,
1444      * so make sure to adjust the replication state. This function will
1445      * also set server.master to NULL. */
1446     replicationHandleMasterDisconnection();
1447 }
1448 
1449 /* Free a cached master, called when there are no longer the conditions for
1450  * a partial resync on reconnection. */
1451 void replicationDiscardCachedMaster(void) {
1452     if (server.cached_master == NULL) return;
1453 
1454     redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
1455     server.cached_master->flags &= ~REDIS_MASTER;
1456     freeClient(server.cached_master);
1457     server.cached_master = NULL;
1458 }
1459 
1460 /* Turn the cached master into the current master, using the file descriptor
1461  * passed as argument as the socket for the new master.
1462  *
1463  * This funciton is called when successfully setup a partial resynchronization
1464  * so the stream of data that we'll receive will start from were this
1465  * master left. */
1466 void replicationResurrectCachedMaster(int newfd) {
1467     server.master = server.cached_master;
1468     server.cached_master = NULL;
1469     server.master->fd = newfd;
1470     server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
1471     server.master->authenticated = 1;
1472     server.master->lastinteraction = server.unixtime;
1473     server.repl_state = REDIS_REPL_CONNECTED;
1474 
1475     /* Re-add to the list of clients. */
1476     listAddNodeTail(server.clients,server.master);
1477     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
1478                           readQueryFromClient, server.master)) {
1479         redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
1480         freeClientAsync(server.master); /* Close ASAP. */
1481     }
1482 
1483     /* We may also need to install the write handler as well if there is
1484      * pending data in the write buffers. */
1485     if (server.master->bufpos || listLength(server.master->reply)) {
1486         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
1487                           sendReplyToClient, server.master)) {
1488             redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
1489             freeClientAsync(server.master); /* Close ASAP. */
1490         }
1491     }
1492 }
1493 
1494 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1495 
1496 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1497  * If the option is active, the server will prevent writes if there are not
1498  * enough connected slaves with the specified lag (or less). */
1499 void refreshGoodSlavesCount(void) {
1500     listIter li;
1501     listNode *ln;
1502     int good = 0;
1503 
1504     if (!server.repl_min_slaves_to_write ||
1505         !server.repl_min_slaves_max_lag) return;
1506 
1507     listRewind(server.slaves,&li);
1508     while((ln = listNext(&li))) {
1509         redisClient *slave = ln->value;
1510         time_t lag = server.unixtime - slave->repl_ack_time;
1511 
1512         if (slave->replstate == REDIS_REPL_ONLINE &&
1513             lag <= server.repl_min_slaves_max_lag) good++;
1514     }
1515     server.repl_good_slaves_count = good;
1516 }
1517 
1518 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
1519  * The goal of this code is to keep track of scripts already sent to every
1520  * connected slave, in order to be able to replicate EVALSHA as it is without
1521  * translating it to EVAL every time it is possible.
1522  *
1523  * We use a capped collection implemented by a hash table for fast lookup
1524  * of scripts we can send as EVALSHA, plus a linked list that is used for
1525  * eviction of the oldest entry when the max number of items is reached.
1526  *
1527  * We don't care about taking a different cache for every different slave
1528  * since to fill the cache again is not very costly, the goal of this code
1529  * is to avoid that the same big script is trasmitted a big number of times
1530  * per second wasting bandwidth and processor speed, but it is not a problem
1531  * if we need to rebuild the cache from scratch from time to time, every used
1532  * script will need to be transmitted a single time to reappear in the cache.
1533  *
1534  * This is how the system works:
1535  *
1536  * 1) Every time a new slave connects, we flush the whole script cache.
1537  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
1538  *    trying to convert EVAL into EVALSHA specifically for slaves.
1539  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
1540  *    corresponding SHA1 of the script into the cache as we are sure every
1541  *    slave knows about the script starting from now.
1542  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
1543  *    and at the same time flush the script cache.
1544  * 5) When the last slave disconnects, flush the cache.
1545  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
1546  *    in the master sometimes.
1547  */
1548 
1549 /* Initialize the script cache, only called at startup. */
1550 void replicationScriptCacheInit(void) {
1551     server.repl_scriptcache_size = 10000;
1552     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
1553     server.repl_scriptcache_fifo = listCreate();
1554 }
1555 
1556 /* Empty the script cache. Should be called every time we are no longer sure
1557  * that every slave knows about all the scripts in our set, or when the
1558  * current AOF "context" is no longer aware of the script. In general we
1559  * should flush the cache:
1560  *
1561  * 1) Every time a new slave reconnects to this master and performs a
1562  *    full SYNC (PSYNC does not require flushing).
1563  * 2) Every time an AOF rewrite is performed.
1564  * 3) Every time we are left without slaves at all, and AOF is off, in order
1565  *    to reclaim otherwise unused memory.
1566  */
1567 void replicationScriptCacheFlush(void) {
1568     dictEmpty(server.repl_scriptcache_dict,NULL);
1569     listRelease(server.repl_scriptcache_fifo);
1570     server.repl_scriptcache_fifo = listCreate();
1571 }
1572 
1573 /* Add an entry into the script cache, if we reach max number of entries the
1574  * oldest is removed from the list. */
1575 void replicationScriptCacheAdd(sds sha1) {
1576     int retval;
1577     sds key = sdsdup(sha1);
1578 
1579     /* Evict oldest. */
1580     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
1581     {
1582         listNode *ln = listLast(server.repl_scriptcache_fifo);
1583         sds oldest = listNodeValue(ln);
1584 
1585         retval = dictDelete(server.repl_scriptcache_dict,oldest);
1586         redisAssert(retval == DICT_OK);
1587         listDelNode(server.repl_scriptcache_fifo,ln);
1588     }
1589 
1590     /* Add current. */
1591     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
1592     listAddNodeHead(server.repl_scriptcache_fifo,key);
1593     redisAssert(retval == DICT_OK);
1594 }
1595 
1596 /* Returns non-zero if the specified entry exists inside the cache, that is,
1597  * if all the slaves are aware of this script SHA1. */
1598 int replicationScriptCacheExists(sds sha1) {
1599     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
1600 }
1601 
1602 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
1603  * Redis synchronous replication design can be summarized in points:
1604  *
1605  * - Redis masters have a global replication offset, used by PSYNC.
1606  * - Master increment the offset every time new commands are sent to slaves.
1607  * - Slaves ping back masters with the offset processed so far.
1608  *
1609  * So synchronous replication adds a new WAIT command in the form:
1610  *
1611  *   WAIT <num_replicas> <milliseconds_timeout>
1612  *
1613  * That returns the number of replicas that processed the query when
1614  * we finally have at least num_replicas, or when the timeout was
1615  * reached.
1616  *
1617  * The command is implemented in this way:
1618  *
1619  * - Every time a client processes a command, we remember the replication
1620  *   offset after sending that command to the slaves.
1621  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
1622  *   The client is blocked at the same time (see blocked.c).
1623  * - Once we receive enough ACKs for a given offset or when the timeout
1624  *   is reached, the WAIT command is unblocked and the reply sent to the
1625  *   client.
1626  */
1627 
1628 /* This just set a flag so that we broadcast a REPLCONF GETACK command
1629  * to all the slaves in the beforeSleep() function. Note that this way
1630  * we "group" all the clients that want to wait for synchronouns replication
1631  * in a given event loop iteration, and send a single GETACK for them all. */
1632 void replicationRequestAckFromSlaves(void) {
1633     server.get_ack_from_slaves = 1;
1634 }
1635 
1636 /* Return the number of slaves that already acknowledged the specified
1637  * replication offset. */
1638 int replicationCountAcksByOffset(long long offset) {
1639     listIter li;
1640     listNode *ln;
1641     int count = 0;
1642 
1643     listRewind(server.slaves,&li);
1644     while((ln = listNext(&li))) {
1645         redisClient *slave = ln->value;
1646 
1647         if (slave->replstate != REDIS_REPL_ONLINE) continue;
1648         if (slave->repl_ack_off >= offset) count++;
1649     }
1650     return count;
1651 }
1652 
1653 /* WAIT for N replicas to acknowledge the processing of our latest
1654  * write command (and all the previous commands). */
1655 void waitCommand(redisClient *c) {
1656     mstime_t timeout;
1657     long numreplicas, ackreplicas;
1658     long long offset = c->woff;
1659 
1660     /* Argument parsing. */
1661     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK)
1662         return;
1663     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
1664         != REDIS_OK) return;
1665 
1666     /* First try without blocking at all. */
1667     ackreplicas = replicationCountAcksByOffset(c->woff);
1668     if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) {
1669         addReplyLongLong(c,ackreplicas);
1670         return;
1671     }
1672 
1673     /* Otherwise block the client and put it into our list of clients
1674      * waiting for ack from slaves. */
1675     c->bpop.timeout = timeout;
1676     c->bpop.reploffset = offset;
1677     c->bpop.numreplicas = numreplicas;
1678     listAddNodeTail(server.clients_waiting_acks,c);
1679     blockClient(c,REDIS_BLOCKED_WAIT);
1680 
1681     /* Make sure that the server will send an ACK request to all the slaves
1682      * before returning to the event loop. */
1683     replicationRequestAckFromSlaves();
1684 }
1685 
1686 /* This is called by unblockClient() to perform the blocking op type
1687  * specific cleanup. We just remove the client from the list of clients
1688  * waiting for replica acks. Never call it directly, call unblockClient()
1689  * instead. */
1690 void unblockClientWaitingReplicas(redisClient *c) {
1691     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
1692     redisAssert(ln != NULL);
1693     listDelNode(server.clients_waiting_acks,ln);
1694 }
1695 
1696 /* Check if there are clients blocked in WAIT that can be unblocked since
1697  * we received enough ACKs from slaves. */
1698 void processClientsWaitingReplicas(void) {
1699     long long last_offset = 0;
1700     int last_numreplicas = 0;
1701 
1702     listIter li;
1703     listNode *ln;
1704 
1705     listRewind(server.clients_waiting_acks,&li);
1706     while((ln = listNext(&li))) {
1707         redisClient *c = ln->value;
1708 
1709         /* Every time we find a client that is satisfied for a given
1710          * offset and number of replicas, we remember it so the next client
1711          * may be unblocked without calling replicationCountAcksByOffset()
1712          * if the requested offset / replicas were equal or less. */
1713         if (last_offset && last_offset > c->bpop.reploffset &&
1714                            last_numreplicas > c->bpop.numreplicas)
1715         {
1716             unblockClient(c);
1717             addReplyLongLong(c,last_numreplicas);
1718         } else {
1719             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
1720 
1721             if (numreplicas >= c->bpop.numreplicas) {
1722                 last_offset = c->bpop.reploffset;
1723                 last_numreplicas = numreplicas;
1724                 unblockClient(c);
1725                 addReplyLongLong(c,numreplicas);
1726             }
1727         }
1728     }
1729 }
1730 
1731 /* Return the slave replication offset for this instance, that is
1732  * the offset for which we already processed the master replication stream. */
1733 long long replicationGetSlaveOffset(void) {
1734     long long offset = 0;
1735 
1736     if (server.masterhost != NULL) {
1737         if (server.master) {
1738             offset = server.master->reploff;
1739         } else if (server.cached_master) {
1740             offset = server.cached_master->reploff;
1741         }
1742     }
1743     /* offset may be -1 when the master does not support it at all, however
1744      * this function is designed to return an offset that can express the
1745      * amount of data processed by the master, so we return a positive
1746      * integer. */
1747     if (offset < 0) offset = 0;
1748     return offset;
1749 }
1750 
1751 /* --------------------------- REPLICATION CRON  ---------------------------- */
1752 
1753 /* Replication cron funciton, called 1 time per second. */
1754 void replicationCron(void) {
1755     /* Non blocking connection timeout? */
1756     if (server.masterhost &&
1757         (server.repl_state == REDIS_REPL_CONNECTING ||
1758          server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
1759         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1760     {
1761         redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
1762         undoConnectWithMaster();
1763     }
1764 
1765     /* Bulk transfer I/O timeout? */
1766     if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
1767         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1768     {
1769         redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
1770         replicationAbortSyncTransfer();
1771     }
1772 
1773     /* Timed out master when we are an already connected slave? */
1774     if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
1775         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
1776     {
1777         redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
1778         freeClient(server.master);
1779     }
1780 
1781     /* Check if we should connect to a MASTER */
1782     if (server.repl_state == REDIS_REPL_CONNECT) {
1783         redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
1784             server.masterhost, server.masterport);
1785         if (connectWithMaster() == REDIS_OK) {
1786             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
1787         }
1788     }
1789 
1790     /* Send ACK to master from time to time.
1791      * Note that we do not send periodic acks to masters that don't
1792      * support PSYNC and replication offsets. */
1793     if (server.masterhost && server.master &&
1794         !(server.master->flags & REDIS_PRE_PSYNC))
1795         replicationSendAck();
1796 
1797     /* If we have attached slaves, PING them from time to time.
1798      * So slaves can implement an explicit timeout to masters, and will
1799      * be able to detect a link disconnection even if the TCP connection
1800      * will not actually go down. */
1801     if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
1802         listIter li;
1803         listNode *ln;
1804         robj *ping_argv[1];
1805 
1806         /* First, send PING */
1807         ping_argv[0] = createStringObject("PING",4);
1808         replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
1809         decrRefCount(ping_argv[0]);
1810 
1811         /* Second, send a newline to all the slaves in pre-synchronization
1812          * stage, that is, slaves waiting for the master to create the RDB file.
1813          * The newline will be ignored by the slave but will refresh the
1814          * last-io timer preventing a timeout. */
1815         listRewind(server.slaves,&li);
1816         while((ln = listNext(&li))) {
1817             redisClient *slave = ln->value;
1818 
1819             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
1820                 slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
1821                 if (write(slave->fd, "\n", 1) == -1) {
1822                     /* Don't worry, it's just a ping. */
1823                 }
1824             }
1825         }
1826     }
1827 
1828     /* Disconnect timedout slaves. */
1829     if (listLength(server.slaves)) {
1830         listIter li;
1831         listNode *ln;
1832 
1833         listRewind(server.slaves,&li);
1834         while((ln = listNext(&li))) {
1835             redisClient *slave = ln->value;
1836 
1837             if (slave->replstate != REDIS_REPL_ONLINE) continue;
1838             if (slave->flags & REDIS_PRE_PSYNC) continue;
1839             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
1840             {
1841                 char ip[REDIS_IP_STR_LEN];
1842                 int port;
1843 
1844                 if (anetPeerToString(slave->fd,ip,sizeof(ip),&port) != -1) {
1845                     redisLog(REDIS_WARNING,
1846                         "Disconnecting timedout slave: %s:%d",
1847                         ip, slave->slave_listening_port);
1848                 }
1849                 freeClient(slave);
1850             }
1851         }
1852     }
1853 
1854     /* If we have no attached slaves and there is a replication backlog
1855      * using memory, free it after some (configured) time. */
1856     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
1857         server.repl_backlog)
1858     {
1859         time_t idle = server.unixtime - server.repl_no_slaves_since;
1860 
1861         if (idle > server.repl_backlog_time_limit) {
1862             freeReplicationBacklog();
1863             redisLog(REDIS_NOTICE,
1864                 "Replication backlog freed after %d seconds "
1865                 "without connected slaves.",
1866                 (int) server.repl_backlog_time_limit);
1867         }
1868     }
1869 
1870     /* If AOF is disabled and we no longer have attached slaves, we can
1871      * free our Replication Script Cache as there is no need to propagate
1872      * EVALSHA at all. */
1873     if (listLength(server.slaves) == 0 &&
1874         server.aof_state == REDIS_AOF_OFF &&
1875         listLength(server.repl_scriptcache_fifo) != 0)
1876     {
1877         replicationScriptCacheFlush();
1878     }
1879 
1880     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
1881     refreshGoodSlavesCount();
1882 }
1883