xref: /redis-3.2.3/src/replication.c (revision fdafe233)
1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "server.h"
33 
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 
40 void replicationDiscardCachedMaster(void);
41 void replicationResurrectCachedMaster(int newfd);
42 void replicationSendAck(void);
43 void putSlaveOnline(client *slave);
44 int cancelReplicationHandshake(void);
45 
46 /* --------------------------- Utility functions ---------------------------- */
47 
48 /* Return the pointer to a string representing the slave ip:listening_port
49  * pair. Mostly useful for logging, since we want to log a slave using its
50  * IP address and it's listening port which is more clear for the user, for
51  * example: "Closing connection with slave 10.1.2.3:6380". */
52 char *replicationGetSlaveName(client *c) {
53     static char buf[NET_PEER_ID_LEN];
54     char ip[NET_IP_STR_LEN];
55 
56     ip[0] = '\0';
57     buf[0] = '\0';
58     if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
59         if (c->slave_listening_port)
60             anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
61         else
62             snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
63     } else {
64         snprintf(buf,sizeof(buf),"client id #%llu",
65             (unsigned long long) c->id);
66     }
67     return buf;
68 }
69 
70 /* ---------------------------------- MASTER -------------------------------- */
71 
72 void createReplicationBacklog(void) {
73     serverAssert(server.repl_backlog == NULL);
74     server.repl_backlog = zmalloc(server.repl_backlog_size);
75     server.repl_backlog_histlen = 0;
76     server.repl_backlog_idx = 0;
77     /* When a new backlog buffer is created, we increment the replication
78      * offset by one to make sure we'll not be able to PSYNC with any
79      * previous slave. This is needed because we avoid incrementing the
80      * master_repl_offset if no backlog exists nor slaves are attached. */
81     server.master_repl_offset++;
82 
83     /* We don't have any data inside our buffer, but virtually the first
84      * byte we have is the next byte that will be generated for the
85      * replication stream. */
86     server.repl_backlog_off = server.master_repl_offset+1;
87 }
88 
89 /* This function is called when the user modifies the replication backlog
90  * size at runtime. It is up to the function to both update the
91  * server.repl_backlog_size and to resize the buffer and setup it so that
92  * it contains the same data as the previous one (possibly less data, but
93  * the most recent bytes, or the same data and more free space in case the
94  * buffer is enlarged). */
95 void resizeReplicationBacklog(long long newsize) {
96     if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
97         newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
98     if (server.repl_backlog_size == newsize) return;
99 
100     server.repl_backlog_size = newsize;
101     if (server.repl_backlog != NULL) {
102         /* What we actually do is to flush the old buffer and realloc a new
103          * empty one. It will refill with new data incrementally.
104          * The reason is that copying a few gigabytes adds latency and even
105          * worse often we need to alloc additional space before freeing the
106          * old buffer. */
107         zfree(server.repl_backlog);
108         server.repl_backlog = zmalloc(server.repl_backlog_size);
109         server.repl_backlog_histlen = 0;
110         server.repl_backlog_idx = 0;
111         /* Next byte we have is... the next since the buffer is empty. */
112         server.repl_backlog_off = server.master_repl_offset+1;
113     }
114 }
115 
116 void freeReplicationBacklog(void) {
117     serverAssert(listLength(server.slaves) == 0);
118     zfree(server.repl_backlog);
119     server.repl_backlog = NULL;
120 }
121 
122 /* Add data to the replication backlog.
123  * This function also increments the global replication offset stored at
124  * server.master_repl_offset, because there is no case where we want to feed
125  * the backlog without incrementing the buffer. */
126 void feedReplicationBacklog(void *ptr, size_t len) {
127     unsigned char *p = ptr;
128 
129     server.master_repl_offset += len;
130 
131     /* This is a circular buffer, so write as much data we can at every
132      * iteration and rewind the "idx" index if we reach the limit. */
133     while(len) {
134         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
135         if (thislen > len) thislen = len;
136         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
137         server.repl_backlog_idx += thislen;
138         if (server.repl_backlog_idx == server.repl_backlog_size)
139             server.repl_backlog_idx = 0;
140         len -= thislen;
141         p += thislen;
142         server.repl_backlog_histlen += thislen;
143     }
144     if (server.repl_backlog_histlen > server.repl_backlog_size)
145         server.repl_backlog_histlen = server.repl_backlog_size;
146     /* Set the offset of the first byte we have in the backlog. */
147     server.repl_backlog_off = server.master_repl_offset -
148                               server.repl_backlog_histlen + 1;
149 }
150 
151 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
152  * as input. */
153 void feedReplicationBacklogWithObject(robj *o) {
154     char llstr[LONG_STR_SIZE];
155     void *p;
156     size_t len;
157 
158     if (o->encoding == OBJ_ENCODING_INT) {
159         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
160         p = llstr;
161     } else {
162         len = sdslen(o->ptr);
163         p = o->ptr;
164     }
165     feedReplicationBacklog(p,len);
166 }
167 
168 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
169     listNode *ln;
170     listIter li;
171     int j, len;
172     char llstr[LONG_STR_SIZE];
173 
174     /* If there aren't slaves, and there is no backlog buffer to populate,
175      * we can return ASAP. */
176     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
177 
178     /* We can't have slaves attached and no backlog. */
179     serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
180 
181     /* Send SELECT command to every slave if needed. */
182     if (server.slaveseldb != dictid) {
183         robj *selectcmd;
184 
185         /* For a few DBs we have pre-computed SELECT command. */
186         if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
187             selectcmd = shared.select[dictid];
188         } else {
189             int dictid_len;
190 
191             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
192             selectcmd = createObject(OBJ_STRING,
193                 sdscatprintf(sdsempty(),
194                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
195                 dictid_len, llstr));
196         }
197 
198         /* Add the SELECT command into the backlog. */
199         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
200 
201         /* Send it to slaves. */
202         listRewind(slaves,&li);
203         while((ln = listNext(&li))) {
204             client *slave = ln->value;
205             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
206             addReply(slave,selectcmd);
207         }
208 
209         if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
210             decrRefCount(selectcmd);
211     }
212     server.slaveseldb = dictid;
213 
214     /* Write the command to the replication backlog if any. */
215     if (server.repl_backlog) {
216         char aux[LONG_STR_SIZE+3];
217 
218         /* Add the multi bulk reply length. */
219         aux[0] = '*';
220         len = ll2string(aux+1,sizeof(aux)-1,argc);
221         aux[len+1] = '\r';
222         aux[len+2] = '\n';
223         feedReplicationBacklog(aux,len+3);
224 
225         for (j = 0; j < argc; j++) {
226             long objlen = stringObjectLen(argv[j]);
227 
228             /* We need to feed the buffer with the object as a bulk reply
229              * not just as a plain string, so create the $..CRLF payload len
230              * and add the final CRLF */
231             aux[0] = '$';
232             len = ll2string(aux+1,sizeof(aux)-1,objlen);
233             aux[len+1] = '\r';
234             aux[len+2] = '\n';
235             feedReplicationBacklog(aux,len+3);
236             feedReplicationBacklogWithObject(argv[j]);
237             feedReplicationBacklog(aux+len+1,2);
238         }
239     }
240 
241     /* Write the command to every slave. */
242     listRewind(server.slaves,&li);
243     while((ln = listNext(&li))) {
244         client *slave = ln->value;
245 
246         /* Don't feed slaves that are still waiting for BGSAVE to start */
247         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
248 
249         /* Feed slaves that are waiting for the initial SYNC (so these commands
250          * are queued in the output buffer until the initial SYNC completes),
251          * or are already in sync with the master. */
252 
253         /* Add the multi bulk length. */
254         addReplyMultiBulkLen(slave,argc);
255 
256         /* Finally any additional argument that was not stored inside the
257          * static buffer if any (from j to argc). */
258         for (j = 0; j < argc; j++)
259             addReplyBulk(slave,argv[j]);
260     }
261 }
262 
263 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
264     listNode *ln;
265     listIter li;
266     int j;
267     sds cmdrepr = sdsnew("+");
268     robj *cmdobj;
269     struct timeval tv;
270 
271     gettimeofday(&tv,NULL);
272     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
273     if (c->flags & CLIENT_LUA) {
274         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
275     } else if (c->flags & CLIENT_UNIX_SOCKET) {
276         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
277     } else {
278         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
279     }
280 
281     for (j = 0; j < argc; j++) {
282         if (argv[j]->encoding == OBJ_ENCODING_INT) {
283             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
284         } else {
285             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
286                         sdslen(argv[j]->ptr));
287         }
288         if (j != argc-1)
289             cmdrepr = sdscatlen(cmdrepr," ",1);
290     }
291     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
292     cmdobj = createObject(OBJ_STRING,cmdrepr);
293 
294     listRewind(monitors,&li);
295     while((ln = listNext(&li))) {
296         client *monitor = ln->value;
297         addReply(monitor,cmdobj);
298     }
299     decrRefCount(cmdobj);
300 }
301 
302 /* Feed the slave 'c' with the replication backlog starting from the
303  * specified 'offset' up to the end of the backlog. */
304 long long addReplyReplicationBacklog(client *c, long long offset) {
305     long long j, skip, len;
306 
307     serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
308 
309     if (server.repl_backlog_histlen == 0) {
310         serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
311         return 0;
312     }
313 
314     serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
315              server.repl_backlog_size);
316     serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
317              server.repl_backlog_off);
318     serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
319              server.repl_backlog_histlen);
320     serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
321              server.repl_backlog_idx);
322 
323     /* Compute the amount of bytes we need to discard. */
324     skip = offset - server.repl_backlog_off;
325     serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
326 
327     /* Point j to the oldest byte, that is actaully our
328      * server.repl_backlog_off byte. */
329     j = (server.repl_backlog_idx +
330         (server.repl_backlog_size-server.repl_backlog_histlen)) %
331         server.repl_backlog_size;
332     serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
333 
334     /* Discard the amount of data to seek to the specified 'offset'. */
335     j = (j + skip) % server.repl_backlog_size;
336 
337     /* Feed slave with data. Since it is a circular buffer we have to
338      * split the reply in two parts if we are cross-boundary. */
339     len = server.repl_backlog_histlen - skip;
340     serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
341     while(len) {
342         long long thislen =
343             ((server.repl_backlog_size - j) < len) ?
344             (server.repl_backlog_size - j) : len;
345 
346         serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
347         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
348         len -= thislen;
349         j = 0;
350     }
351     return server.repl_backlog_histlen - skip;
352 }
353 
354 /* Return the offset to provide as reply to the PSYNC command received
355  * from the slave. The returned value is only valid immediately after
356  * the BGSAVE process started and before executing any other command
357  * from clients. */
358 long long getPsyncInitialOffset(void) {
359     long long psync_offset = server.master_repl_offset;
360     /* Add 1 to psync_offset if it the replication backlog does not exists
361      * as when it will be created later we'll increment the offset by one. */
362     if (server.repl_backlog == NULL) psync_offset++;
363     return psync_offset;
364 }
365 
366 /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
367  * as a side effect setup the slave for a full sync in different ways:
368  *
369  * 1) Remember, into the slave client structure, the offset we sent
370  *    here, so that if new slaves will later attach to the same
371  *    background RDB saving process (by duplicating this client output
372  *    buffer), we can get the right offset from this slave.
373  * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
374  *    we start accumulating differences from this point.
375  * 3) Force the replication stream to re-emit a SELECT statement so
376  *    the new slave incremental differences will start selecting the
377  *    right database number.
378  *
379  * Normally this function should be called immediately after a successful
380  * BGSAVE for replication was started, or when there is one already in
381  * progress that we attached our slave to. */
382 int replicationSetupSlaveForFullResync(client *slave, long long offset) {
383     char buf[128];
384     int buflen;
385 
386     slave->psync_initial_offset = offset;
387     slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
388     /* We are going to accumulate the incremental changes for this
389      * slave as well. Set slaveseldb to -1 in order to force to re-emit
390      * a SLEECT statement in the replication stream. */
391     server.slaveseldb = -1;
392 
393     /* Don't send this reply to slaves that approached us with
394      * the old SYNC command. */
395     if (!(slave->flags & CLIENT_PRE_PSYNC)) {
396         buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
397                           server.runid,offset);
398         if (write(slave->fd,buf,buflen) != buflen) {
399             freeClientAsync(slave);
400             return C_ERR;
401         }
402     }
403     return C_OK;
404 }
405 
406 /* This function handles the PSYNC command from the point of view of a
407  * master receiving a request for partial resynchronization.
408  *
409  * On success return C_OK, otherwise C_ERR is returned and we proceed
410  * with the usual full resync. */
411 int masterTryPartialResynchronization(client *c) {
412     long long psync_offset, psync_len;
413     char *master_runid = c->argv[1]->ptr;
414     char buf[128];
415     int buflen;
416 
417     /* Is the runid of this master the same advertised by the wannabe slave
418      * via PSYNC? If runid changed this master is a different instance and
419      * there is no way to continue. */
420     if (strcasecmp(master_runid, server.runid)) {
421         /* Run id "?" is used by slaves that want to force a full resync. */
422         if (master_runid[0] != '?') {
423             serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
424                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
425                 master_runid, server.runid);
426         } else {
427             serverLog(LL_NOTICE,"Full resync requested by slave %s",
428                 replicationGetSlaveName(c));
429         }
430         goto need_full_resync;
431     }
432 
433     /* We still have the data our slave is asking for? */
434     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
435        C_OK) goto need_full_resync;
436     if (!server.repl_backlog ||
437         psync_offset < server.repl_backlog_off ||
438         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
439     {
440         serverLog(LL_NOTICE,
441             "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
442         if (psync_offset > server.master_repl_offset) {
443             serverLog(LL_WARNING,
444                 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
445         }
446         goto need_full_resync;
447     }
448 
449     /* If we reached this point, we are able to perform a partial resync:
450      * 1) Set client state to make it a slave.
451      * 2) Inform the client we can continue with +CONTINUE
452      * 3) Send the backlog data (from the offset to the end) to the slave. */
453     c->flags |= CLIENT_SLAVE;
454     c->replstate = SLAVE_STATE_ONLINE;
455     c->repl_ack_time = server.unixtime;
456     c->repl_put_online_on_ack = 0;
457     listAddNodeTail(server.slaves,c);
458     /* We can't use the connection buffers since they are used to accumulate
459      * new commands at this stage. But we are sure the socket send buffer is
460      * empty so this write will never fail actually. */
461     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
462     if (write(c->fd,buf,buflen) != buflen) {
463         freeClientAsync(c);
464         return C_OK;
465     }
466     psync_len = addReplyReplicationBacklog(c,psync_offset);
467     serverLog(LL_NOTICE,
468         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
469             replicationGetSlaveName(c),
470             psync_len, psync_offset);
471     /* Note that we don't need to set the selected DB at server.slaveseldb
472      * to -1 to force the master to emit SELECT, since the slave already
473      * has this state from the previous connection with the master. */
474 
475     refreshGoodSlavesCount();
476     return C_OK; /* The caller can return, no full resync needed. */
477 
478 need_full_resync:
479     /* We need a full resync for some reason... Note that we can't
480      * reply to PSYNC right now if a full SYNC is needed. The reply
481      * must include the master offset at the time the RDB file we transfer
482      * is generated, so we need to delay the reply to that moment. */
483     return C_ERR;
484 }
485 
486 /* Start a BGSAVE for replication goals, which is, selecting the disk or
487  * socket target depending on the configuration, and making sure that
488  * the script cache is flushed before to start.
489  *
490  * The mincapa argument is the bitwise AND among all the slaves capabilities
491  * of the slaves waiting for this BGSAVE, so represents the slave capabilities
492  * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
493  *
494  * Side effects, other than starting a BGSAVE:
495  *
496  * 1) Handle the slaves in WAIT_START state, by preparing them for a full
497  *    sync if the BGSAVE was succesfully started, or sending them an error
498  *    and dropping them from the list of slaves.
499  *
500  * 2) Flush the Lua scripting script cache if the BGSAVE was actually
501  *    started.
502  *
503  * Returns C_OK on success or C_ERR otherwise. */
504 int startBgsaveForReplication(int mincapa) {
505     int retval;
506     int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
507     listIter li;
508     listNode *ln;
509 
510     serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
511         socket_target ? "slaves sockets" : "disk");
512 
513     if (socket_target)
514         retval = rdbSaveToSlavesSockets();
515     else
516         retval = rdbSaveBackground(server.rdb_filename);
517 
518     /* If we failed to BGSAVE, remove the slaves waiting for a full
519      * resynchorinization from the list of salves, inform them with
520      * an error about what happened, close the connection ASAP. */
521     if (retval == C_ERR) {
522         serverLog(LL_WARNING,"BGSAVE for replication failed");
523         listRewind(server.slaves,&li);
524         while((ln = listNext(&li))) {
525             client *slave = ln->value;
526 
527             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
528                 slave->flags &= ~CLIENT_SLAVE;
529                 listDelNode(server.slaves,ln);
530                 addReplyError(slave,
531                     "BGSAVE failed, replication can't continue");
532                 slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
533             }
534         }
535         return retval;
536     }
537 
538     /* If the target is socket, rdbSaveToSlavesSockets() already setup
539      * the salves for a full resync. Otherwise for disk target do it now.*/
540     if (!socket_target) {
541         listRewind(server.slaves,&li);
542         while((ln = listNext(&li))) {
543             client *slave = ln->value;
544 
545             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
546                     replicationSetupSlaveForFullResync(slave,
547                             getPsyncInitialOffset());
548             }
549         }
550     }
551 
552     /* Flush the script cache, since we need that slave differences are
553      * accumulated without requiring slaves to match our cached scripts. */
554     if (retval == C_OK) replicationScriptCacheFlush();
555     return retval;
556 }
557 
558 /* SYNC and PSYNC command implemenation. */
559 void syncCommand(client *c) {
560     /* ignore SYNC if already slave or in monitor mode */
561     if (c->flags & CLIENT_SLAVE) return;
562 
563     /* Refuse SYNC requests if we are a slave but the link with our master
564      * is not ok... */
565     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
566         addReplyError(c,"Can't SYNC while not connected with my master");
567         return;
568     }
569 
570     /* SYNC can't be issued when the server has pending data to send to
571      * the client about already issued commands. We need a fresh reply
572      * buffer registering the differences between the BGSAVE and the current
573      * dataset, so that we can copy to other slaves if needed. */
574     if (clientHasPendingReplies(c)) {
575         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
576         return;
577     }
578 
579     serverLog(LL_NOTICE,"Slave %s asks for synchronization",
580         replicationGetSlaveName(c));
581 
582     /* Try a partial resynchronization if this is a PSYNC command.
583      * If it fails, we continue with usual full resynchronization, however
584      * when this happens masterTryPartialResynchronization() already
585      * replied with:
586      *
587      * +FULLRESYNC <runid> <offset>
588      *
589      * So the slave knows the new runid and offset to try a PSYNC later
590      * if the connection with the master is lost. */
591     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
592         if (masterTryPartialResynchronization(c) == C_OK) {
593             server.stat_sync_partial_ok++;
594             return; /* No full resync needed, return. */
595         } else {
596             char *master_runid = c->argv[1]->ptr;
597 
598             /* Increment stats for failed PSYNCs, but only if the
599              * runid is not "?", as this is used by slaves to force a full
600              * resync on purpose when they are not albe to partially
601              * resync. */
602             if (master_runid[0] != '?') server.stat_sync_partial_err++;
603         }
604     } else {
605         /* If a slave uses SYNC, we are dealing with an old implementation
606          * of the replication protocol (like redis-cli --slave). Flag the client
607          * so that we don't expect to receive REPLCONF ACK feedbacks. */
608         c->flags |= CLIENT_PRE_PSYNC;
609     }
610 
611     /* Full resynchronization. */
612     server.stat_sync_full++;
613 
614     /* Setup the slave as one waiting for BGSAVE to start. The following code
615      * paths will change the state if we handle the slave differently. */
616     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
617     if (server.repl_disable_tcp_nodelay)
618         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
619     c->repldbfd = -1;
620     c->flags |= CLIENT_SLAVE;
621     listAddNodeTail(server.slaves,c);
622 
623     /* CASE 1: BGSAVE is in progress, with disk target. */
624     if (server.rdb_child_pid != -1 &&
625         server.rdb_child_type == RDB_CHILD_TYPE_DISK)
626     {
627         /* Ok a background save is in progress. Let's check if it is a good
628          * one for replication, i.e. if there is another slave that is
629          * registering differences since the server forked to save. */
630         client *slave;
631         listNode *ln;
632         listIter li;
633 
634         listRewind(server.slaves,&li);
635         while((ln = listNext(&li))) {
636             slave = ln->value;
637             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
638         }
639         /* To attach this slave, we check that it has at least all the
640          * capabilities of the slave that triggered the current BGSAVE. */
641         if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
642             /* Perfect, the server is already registering differences for
643              * another slave. Set the right state, and copy the buffer. */
644             copyClientOutputBuffer(c,slave);
645             replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
646             serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
647         } else {
648             /* No way, we need to wait for the next BGSAVE in order to
649              * register differences. */
650             serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
651         }
652 
653     /* CASE 2: BGSAVE is in progress, with socket target. */
654     } else if (server.rdb_child_pid != -1 &&
655                server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
656     {
657         /* There is an RDB child process but it is writing directly to
658          * children sockets. We need to wait for the next BGSAVE
659          * in order to synchronize. */
660         serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
661 
662     /* CASE 3: There is no BGSAVE is progress. */
663     } else {
664         if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
665             /* Diskless replication RDB child is created inside
666              * replicationCron() since we want to delay its start a
667              * few seconds to wait for more slaves to arrive. */
668             if (server.repl_diskless_sync_delay)
669                 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
670         } else {
671             /* Target is disk (or the slave is not capable of supporting
672              * diskless replication) and we don't have a BGSAVE in progress,
673              * let's start one. */
674             if (server.aof_child_pid != -1) {
675                 startBgsaveForReplication(c->slave_capa);
676             } else {
677                 serverLog(LL_NOTICE,
678                     "No BGSAVE in progress, but an AOF rewrite is active. "
679                     "BGSAVE for replication delayed");
680             }
681         }
682     }
683 
684     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
685         createReplicationBacklog();
686     return;
687 }
688 
689 /* REPLCONF <option> <value> <option> <value> ...
690  * This command is used by a slave in order to configure the replication
691  * process before starting it with the SYNC command.
692  *
693  * Currently the only use of this command is to communicate to the master
694  * what is the listening port of the Slave redis instance, so that the
695  * master can accurately list slaves and their listening ports in
696  * the INFO output.
697  *
698  * In the future the same command can be used in order to configure
699  * the replication to initiate an incremental replication instead of a
700  * full resync. */
701 void replconfCommand(client *c) {
702     int j;
703 
704     if ((c->argc % 2) == 0) {
705         /* Number of arguments must be odd to make sure that every
706          * option has a corresponding value. */
707         addReply(c,shared.syntaxerr);
708         return;
709     }
710 
711     /* Process every option-value pair. */
712     for (j = 1; j < c->argc; j+=2) {
713         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
714             long port;
715 
716             if ((getLongFromObjectOrReply(c,c->argv[j+1],
717                     &port,NULL) != C_OK))
718                 return;
719             c->slave_listening_port = port;
720         } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
721             /* Ignore capabilities not understood by this master. */
722             if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
723                 c->slave_capa |= SLAVE_CAPA_EOF;
724         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
725             /* REPLCONF ACK is used by slave to inform the master the amount
726              * of replication stream that it processed so far. It is an
727              * internal only command that normal clients should never use. */
728             long long offset;
729 
730             if (!(c->flags & CLIENT_SLAVE)) return;
731             if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
732                 return;
733             if (offset > c->repl_ack_off)
734                 c->repl_ack_off = offset;
735             c->repl_ack_time = server.unixtime;
736             /* If this was a diskless replication, we need to really put
737              * the slave online when the first ACK is received (which
738              * confirms slave is online and ready to get more data). */
739             if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
740                 putSlaveOnline(c);
741             /* Note: this command does not reply anything! */
742             return;
743         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
744             /* REPLCONF GETACK is used in order to request an ACK ASAP
745              * to the slave. */
746             if (server.masterhost && server.master) replicationSendAck();
747             /* Note: this command does not reply anything! */
748         } else {
749             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
750                 (char*)c->argv[j]->ptr);
751             return;
752         }
753     }
754     addReply(c,shared.ok);
755 }
756 
757 /* This function puts a slave in the online state, and should be called just
758  * after a slave received the RDB file for the initial synchronization, and
759  * we are finally ready to send the incremental stream of commands.
760  *
761  * It does a few things:
762  *
763  * 1) Put the slave in ONLINE state (useless when the function is called
764  *    because state is already ONLINE but repl_put_online_on_ack is true).
765  * 2) Make sure the writable event is re-installed, since calling the SYNC
766  *    command disables it, so that we can accumulate output buffer without
767  *    sending it to the slave.
768  * 3) Update the count of good slaves. */
769 void putSlaveOnline(client *slave) {
770     slave->replstate = SLAVE_STATE_ONLINE;
771     slave->repl_put_online_on_ack = 0;
772     slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
773     if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
774         sendReplyToClient, slave) == AE_ERR) {
775         serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
776         freeClient(slave);
777         return;
778     }
779     refreshGoodSlavesCount();
780     serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
781         replicationGetSlaveName(slave));
782 }
783 
784 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
785     client *slave = privdata;
786     UNUSED(el);
787     UNUSED(mask);
788     char buf[PROTO_IOBUF_LEN];
789     ssize_t nwritten, buflen;
790 
791     /* Before sending the RDB file, we send the preamble as configured by the
792      * replication process. Currently the preamble is just the bulk count of
793      * the file in the form "$<length>\r\n". */
794     if (slave->replpreamble) {
795         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
796         if (nwritten == -1) {
797             serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s",
798                 strerror(errno));
799             freeClient(slave);
800             return;
801         }
802         server.stat_net_output_bytes += nwritten;
803         sdsrange(slave->replpreamble,nwritten,-1);
804         if (sdslen(slave->replpreamble) == 0) {
805             sdsfree(slave->replpreamble);
806             slave->replpreamble = NULL;
807             /* fall through sending data. */
808         } else {
809             return;
810         }
811     }
812 
813     /* If the preamble was already transfered, send the RDB bulk data. */
814     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
815     buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
816     if (buflen <= 0) {
817         serverLog(LL_WARNING,"Read error sending DB to slave: %s",
818             (buflen == 0) ? "premature EOF" : strerror(errno));
819         freeClient(slave);
820         return;
821     }
822     if ((nwritten = write(fd,buf,buflen)) == -1) {
823         if (errno != EAGAIN) {
824             serverLog(LL_WARNING,"Write error sending DB to slave: %s",
825                 strerror(errno));
826             freeClient(slave);
827         }
828         return;
829     }
830     slave->repldboff += nwritten;
831     server.stat_net_output_bytes += nwritten;
832     if (slave->repldboff == slave->repldbsize) {
833         close(slave->repldbfd);
834         slave->repldbfd = -1;
835         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
836         putSlaveOnline(slave);
837     }
838 }
839 
840 /* This function is called at the end of every background saving,
841  * or when the replication RDB transfer strategy is modified from
842  * disk to socket or the other way around.
843  *
844  * The goal of this function is to handle slaves waiting for a successful
845  * background saving in order to perform non-blocking synchronization, and
846  * to schedule a new BGSAVE if there are slaves that attached while a
847  * BGSAVE was in progress, but it was not a good one for replication (no
848  * other slave was accumulating differences).
849  *
850  * The argument bgsaveerr is C_OK if the background saving succeeded
851  * otherwise C_ERR is passed to the function.
852  * The 'type' argument is the type of the child that terminated
853  * (if it had a disk or socket target). */
854 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
855     listNode *ln;
856     int startbgsave = 0;
857     int mincapa = -1;
858     listIter li;
859 
860     listRewind(server.slaves,&li);
861     while((ln = listNext(&li))) {
862         client *slave = ln->value;
863 
864         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
865             startbgsave = 1;
866             mincapa = (mincapa == -1) ? slave->slave_capa :
867                                         (mincapa & slave->slave_capa);
868         } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
869             struct redis_stat buf;
870 
871             /* If this was an RDB on disk save, we have to prepare to send
872              * the RDB from disk to the slave socket. Otherwise if this was
873              * already an RDB -> Slaves socket transfer, used in the case of
874              * diskless replication, our work is trivial, we can just put
875              * the slave online. */
876             if (type == RDB_CHILD_TYPE_SOCKET) {
877                 serverLog(LL_NOTICE,
878                     "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
879                         replicationGetSlaveName(slave));
880                 /* Note: we wait for a REPLCONF ACK message from slave in
881                  * order to really put it online (install the write handler
882                  * so that the accumulated data can be transfered). However
883                  * we change the replication state ASAP, since our slave
884                  * is technically online now. */
885                 slave->replstate = SLAVE_STATE_ONLINE;
886                 slave->repl_put_online_on_ack = 1;
887                 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
888             } else {
889                 if (bgsaveerr != C_OK) {
890                     freeClient(slave);
891                     serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
892                     continue;
893                 }
894                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
895                     redis_fstat(slave->repldbfd,&buf) == -1) {
896                     freeClient(slave);
897                     serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
898                     continue;
899                 }
900                 slave->repldboff = 0;
901                 slave->repldbsize = buf.st_size;
902                 slave->replstate = SLAVE_STATE_SEND_BULK;
903                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
904                     (unsigned long long) slave->repldbsize);
905 
906                 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
907                 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
908                     freeClient(slave);
909                     continue;
910                 }
911             }
912         }
913     }
914     if (startbgsave) startBgsaveForReplication(mincapa);
915 }
916 
917 /* ----------------------------------- SLAVE -------------------------------- */
918 
919 /* Returns 1 if the given replication state is a handshake state,
920  * 0 otherwise. */
921 int slaveIsInHandshakeState(void) {
922     return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
923            server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
924 }
925 
926 /* Avoid the master to detect the slave is timing out while loading the
927  * RDB file in initial synchronization. We send a single newline character
928  * that is valid protocol but is guaranteed to either be sent entierly or
929  * not, since the byte is indivisible.
930  *
931  * The function is called in two contexts: while we flush the current
932  * data with emptyDb(), and while we load the new data received as an
933  * RDB file from the master. */
934 void replicationSendNewlineToMaster(void) {
935     static time_t newline_sent;
936     if (time(NULL) != newline_sent) {
937         newline_sent = time(NULL);
938         if (write(server.repl_transfer_s,"\n",1) == -1) {
939             /* Pinging back in this stage is best-effort. */
940         }
941     }
942 }
943 
944 /* Callback used by emptyDb() while flushing away old data to load
945  * the new dataset received by the master. */
946 void replicationEmptyDbCallback(void *privdata) {
947     UNUSED(privdata);
948     replicationSendNewlineToMaster();
949 }
950 
951 /* Once we have a link with the master and the synchroniziation was
952  * performed, this function materializes the master client we store
953  * at server.master, starting from the specified file descriptor. */
954 void replicationCreateMasterClient(int fd) {
955     server.master = createClient(fd);
956     server.master->flags |= CLIENT_MASTER;
957     server.master->authenticated = 1;
958     server.repl_state = REPL_STATE_CONNECTED;
959     server.master->reploff = server.repl_master_initial_offset;
960     memcpy(server.master->replrunid, server.repl_master_runid,
961         sizeof(server.repl_master_runid));
962     /* If master offset is set to -1, this master is old and is not
963      * PSYNC capable, so we flag it accordingly. */
964     if (server.master->reploff == -1)
965         server.master->flags |= CLIENT_PRE_PSYNC;
966 }
967 
968 /* Asynchronously read the SYNC payload we receive from a master */
969 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
970 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
971     char buf[4096];
972     ssize_t nread, readlen;
973     off_t left;
974     UNUSED(el);
975     UNUSED(privdata);
976     UNUSED(mask);
977 
978     /* Static vars used to hold the EOF mark, and the last bytes received
979      * form the server: when they match, we reached the end of the transfer. */
980     static char eofmark[CONFIG_RUN_ID_SIZE];
981     static char lastbytes[CONFIG_RUN_ID_SIZE];
982     static int usemark = 0;
983 
984     /* If repl_transfer_size == -1 we still have to read the bulk length
985      * from the master reply. */
986     if (server.repl_transfer_size == -1) {
987         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
988             serverLog(LL_WARNING,
989                 "I/O error reading bulk count from MASTER: %s",
990                 strerror(errno));
991             goto error;
992         }
993 
994         if (buf[0] == '-') {
995             serverLog(LL_WARNING,
996                 "MASTER aborted replication with an error: %s",
997                 buf+1);
998             goto error;
999         } else if (buf[0] == '\0') {
1000             /* At this stage just a newline works as a PING in order to take
1001              * the connection live. So we refresh our last interaction
1002              * timestamp. */
1003             server.repl_transfer_lastio = server.unixtime;
1004             return;
1005         } else if (buf[0] != '$') {
1006             serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
1007             goto error;
1008         }
1009 
1010         /* There are two possible forms for the bulk payload. One is the
1011          * usual $<count> bulk format. The other is used for diskless transfers
1012          * when the master does not know beforehand the size of the file to
1013          * transfer. In the latter case, the following format is used:
1014          *
1015          * $EOF:<40 bytes delimiter>
1016          *
1017          * At the end of the file the announced delimiter is transmitted. The
1018          * delimiter is long and random enough that the probability of a
1019          * collision with the actual file content can be ignored. */
1020         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
1021             usemark = 1;
1022             memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
1023             memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
1024             /* Set any repl_transfer_size to avoid entering this code path
1025              * at the next call. */
1026             server.repl_transfer_size = 0;
1027             serverLog(LL_NOTICE,
1028                 "MASTER <-> SLAVE sync: receiving streamed RDB from master");
1029         } else {
1030             usemark = 0;
1031             server.repl_transfer_size = strtol(buf+1,NULL,10);
1032             serverLog(LL_NOTICE,
1033                 "MASTER <-> SLAVE sync: receiving %lld bytes from master",
1034                 (long long) server.repl_transfer_size);
1035         }
1036         return;
1037     }
1038 
1039     /* Read bulk data */
1040     if (usemark) {
1041         readlen = sizeof(buf);
1042     } else {
1043         left = server.repl_transfer_size - server.repl_transfer_read;
1044         readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
1045     }
1046 
1047     nread = read(fd,buf,readlen);
1048     if (nread <= 0) {
1049         serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
1050             (nread == -1) ? strerror(errno) : "connection lost");
1051         cancelReplicationHandshake();
1052         return;
1053     }
1054     server.stat_net_input_bytes += nread;
1055 
1056     /* When a mark is used, we want to detect EOF asap in order to avoid
1057      * writing the EOF mark into the file... */
1058     int eof_reached = 0;
1059 
1060     if (usemark) {
1061         /* Update the last bytes array, and check if it matches our delimiter.*/
1062         if (nread >= CONFIG_RUN_ID_SIZE) {
1063             memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
1064         } else {
1065             int rem = CONFIG_RUN_ID_SIZE-nread;
1066             memmove(lastbytes,lastbytes+nread,rem);
1067             memcpy(lastbytes+rem,buf,nread);
1068         }
1069         if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
1070     }
1071 
1072     server.repl_transfer_lastio = server.unixtime;
1073     if (write(server.repl_transfer_fd,buf,nread) != nread) {
1074         serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
1075         goto error;
1076     }
1077     server.repl_transfer_read += nread;
1078 
1079     /* Delete the last 40 bytes from the file if we reached EOF. */
1080     if (usemark && eof_reached) {
1081         if (ftruncate(server.repl_transfer_fd,
1082             server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
1083         {
1084             serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
1085             goto error;
1086         }
1087     }
1088 
1089     /* Sync data on disk from time to time, otherwise at the end of the transfer
1090      * we may suffer a big delay as the memory buffers are copied into the
1091      * actual disk. */
1092     if (server.repl_transfer_read >=
1093         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1094     {
1095         off_t sync_size = server.repl_transfer_read -
1096                           server.repl_transfer_last_fsync_off;
1097         rdb_fsync_range(server.repl_transfer_fd,
1098             server.repl_transfer_last_fsync_off, sync_size);
1099         server.repl_transfer_last_fsync_off += sync_size;
1100     }
1101 
1102     /* Check if the transfer is now complete */
1103     if (!usemark) {
1104         if (server.repl_transfer_read == server.repl_transfer_size)
1105             eof_reached = 1;
1106     }
1107 
1108     if (eof_reached) {
1109         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
1110             serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
1111             cancelReplicationHandshake();
1112             return;
1113         }
1114         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1115         signalFlushedDb(-1);
1116         emptyDb(replicationEmptyDbCallback);
1117         /* Before loading the DB into memory we need to delete the readable
1118          * handler, otherwise it will get called recursively since
1119          * rdbLoad() will call the event loop to process events from time to
1120          * time for non blocking loading. */
1121         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
1122         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
1123         if (rdbLoad(server.rdb_filename) != C_OK) {
1124             serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1125             cancelReplicationHandshake();
1126             return;
1127         }
1128         /* Final setup of the connected slave <- master link */
1129         zfree(server.repl_transfer_tmpfile);
1130         close(server.repl_transfer_fd);
1131         replicationCreateMasterClient(server.repl_transfer_s);
1132         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1133         /* Restart the AOF subsystem now that we finished the sync. This
1134          * will trigger an AOF rewrite, and when done will start appending
1135          * to the new file. */
1136         if (server.aof_state != AOF_OFF) {
1137             int retry = 10;
1138 
1139             stopAppendOnly();
1140             while (retry-- && startAppendOnly() == C_ERR) {
1141                 serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1142                 sleep(1);
1143             }
1144             if (!retry) {
1145                 serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1146                 exit(1);
1147             }
1148         }
1149     }
1150 
1151     return;
1152 
1153 error:
1154     cancelReplicationHandshake();
1155     return;
1156 }
1157 
1158 /* Send a synchronous command to the master. Used to send AUTH and
1159  * REPLCONF commands before starting the replication with SYNC.
1160  *
1161  * The command returns an sds string representing the result of the
1162  * operation. On error the first byte is a "-".
1163  */
1164 #define SYNC_CMD_READ (1<<0)
1165 #define SYNC_CMD_WRITE (1<<1)
1166 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
1167 char *sendSynchronousCommand(int flags, int fd, ...) {
1168 
1169     /* Create the command to send to the master, we use simple inline
1170      * protocol for simplicity as currently we only send simple strings. */
1171     if (flags & SYNC_CMD_WRITE) {
1172         char *arg;
1173         va_list ap;
1174         sds cmd = sdsempty();
1175         va_start(ap,fd);
1176 
1177         while(1) {
1178             arg = va_arg(ap, char*);
1179             if (arg == NULL) break;
1180 
1181             if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
1182             cmd = sdscat(cmd,arg);
1183         }
1184         cmd = sdscatlen(cmd,"\r\n",2);
1185 
1186         /* Transfer command to the server. */
1187         if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
1188             == -1)
1189         {
1190             sdsfree(cmd);
1191             return sdscatprintf(sdsempty(),"-Writing to master: %s",
1192                     strerror(errno));
1193         }
1194         sdsfree(cmd);
1195         va_end(ap);
1196     }
1197 
1198     /* Read the reply from the server. */
1199     if (flags & SYNC_CMD_READ) {
1200         char buf[256];
1201 
1202         if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000)
1203             == -1)
1204         {
1205             return sdscatprintf(sdsempty(),"-Reading from master: %s",
1206                     strerror(errno));
1207         }
1208         server.repl_transfer_lastio = server.unixtime;
1209         return sdsnew(buf);
1210     }
1211     return NULL;
1212 }
1213 
1214 /* Try a partial resynchronization with the master if we are about to reconnect.
1215  * If there is no cached master structure, at least try to issue a
1216  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1217  * command in order to obtain the master run id and the master replication
1218  * global offset.
1219  *
1220  * This function is designed to be called from syncWithMaster(), so the
1221  * following assumptions are made:
1222  *
1223  * 1) We pass the function an already connected socket "fd".
1224  * 2) This function does not close the file descriptor "fd". However in case
1225  *    of successful partial resynchronization, the function will reuse
1226  *    'fd' as file descriptor of the server.master client structure.
1227  *
1228  * The function is split in two halves: if read_reply is 0, the function
1229  * writes the PSYNC command on the socket, and a new function call is
1230  * needed, with read_reply set to 1, in order to read the reply of the
1231  * command. This is useful in order to support non blocking operations, so
1232  * that we write, return into the event loop, and read when there are data.
1233  *
1234  * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
1235  * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
1236  * with read_reply set to 1. However even when read_reply is set to 1
1237  * the function may return PSYNC_WAIT_REPLY again to signal there were
1238  * insufficient data to read to complete its work. We should re-enter
1239  * into the event loop and wait in such a case.
1240  *
1241  * The function returns:
1242  *
1243  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
1244  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1245  *                   In this case the master run_id and global replication
1246  *                   offset is saved.
1247  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1248  *                      the caller should fall back to SYNC.
1249  * PSYNC_WRITE_ERR: There was an error writing the command to the socket.
1250  * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
1251  *
1252  * Notable side effects:
1253  *
1254  * 1) As a side effect of the function call the function removes the readable
1255  *    event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
1256  * 2) server.repl_master_initial_offset is set to the right value according
1257  *    to the master reply. This will be used to populate the 'server.master'
1258  *    structure replication offset.
1259  */
1260 
1261 #define PSYNC_WRITE_ERROR 0
1262 #define PSYNC_WAIT_REPLY 1
1263 #define PSYNC_CONTINUE 2
1264 #define PSYNC_FULLRESYNC 3
1265 #define PSYNC_NOT_SUPPORTED 4
1266 int slaveTryPartialResynchronization(int fd, int read_reply) {
1267     char *psync_runid;
1268     char psync_offset[32];
1269     sds reply;
1270 
1271     /* Writing half */
1272     if (!read_reply) {
1273         /* Initially set repl_master_initial_offset to -1 to mark the current
1274          * master run_id and offset as not valid. Later if we'll be able to do
1275          * a FULL resync using the PSYNC command we'll set the offset at the
1276          * right value, so that this information will be propagated to the
1277          * client structure representing the master into server.master. */
1278         server.repl_master_initial_offset = -1;
1279 
1280         if (server.cached_master) {
1281             psync_runid = server.cached_master->replrunid;
1282             snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1283             serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
1284         } else {
1285             serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
1286             psync_runid = "?";
1287             memcpy(psync_offset,"-1",3);
1288         }
1289 
1290         /* Issue the PSYNC command */
1291         reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
1292         if (reply != NULL) {
1293             serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
1294             sdsfree(reply);
1295             aeDeleteFileEvent(server.el,fd,AE_READABLE);
1296             return PSYNC_WRITE_ERROR;
1297         }
1298         return PSYNC_WAIT_REPLY;
1299     }
1300 
1301     /* Reading half */
1302     reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1303     if (sdslen(reply) == 0) {
1304         /* The master may send empty newlines after it receives PSYNC
1305          * and before to reply, just to keep the connection alive. */
1306         sdsfree(reply);
1307         return PSYNC_WAIT_REPLY;
1308     }
1309 
1310     aeDeleteFileEvent(server.el,fd,AE_READABLE);
1311 
1312     if (!strncmp(reply,"+FULLRESYNC",11)) {
1313         char *runid = NULL, *offset = NULL;
1314 
1315         /* FULL RESYNC, parse the reply in order to extract the run id
1316          * and the replication offset. */
1317         runid = strchr(reply,' ');
1318         if (runid) {
1319             runid++;
1320             offset = strchr(runid,' ');
1321             if (offset) offset++;
1322         }
1323         if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
1324             serverLog(LL_WARNING,
1325                 "Master replied with wrong +FULLRESYNC syntax.");
1326             /* This is an unexpected condition, actually the +FULLRESYNC
1327              * reply means that the master supports PSYNC, but the reply
1328              * format seems wrong. To stay safe we blank the master
1329              * runid to make sure next PSYNCs will fail. */
1330             memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
1331         } else {
1332             memcpy(server.repl_master_runid, runid, offset-runid-1);
1333             server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
1334             server.repl_master_initial_offset = strtoll(offset,NULL,10);
1335             serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
1336                 server.repl_master_runid,
1337                 server.repl_master_initial_offset);
1338         }
1339         /* We are going to full resync, discard the cached master structure. */
1340         replicationDiscardCachedMaster();
1341         sdsfree(reply);
1342         return PSYNC_FULLRESYNC;
1343     }
1344 
1345     if (!strncmp(reply,"+CONTINUE",9)) {
1346         /* Partial resync was accepted, set the replication state accordingly */
1347         serverLog(LL_NOTICE,
1348             "Successful partial resynchronization with master.");
1349         sdsfree(reply);
1350         replicationResurrectCachedMaster(fd);
1351         return PSYNC_CONTINUE;
1352     }
1353 
1354     /* If we reach this point we received either an error since the master does
1355      * not understand PSYNC, or an unexpected reply from the master.
1356      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
1357 
1358     if (strncmp(reply,"-ERR",4)) {
1359         /* If it's not an error, log the unexpected event. */
1360         serverLog(LL_WARNING,
1361             "Unexpected reply to PSYNC from master: %s", reply);
1362     } else {
1363         serverLog(LL_NOTICE,
1364             "Master does not support PSYNC or is in "
1365             "error state (reply: %s)", reply);
1366     }
1367     sdsfree(reply);
1368     replicationDiscardCachedMaster();
1369     return PSYNC_NOT_SUPPORTED;
1370 }
1371 
1372 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
1373     char tmpfile[256], *err = NULL;
1374     int dfd, maxtries = 5;
1375     int sockerr = 0, psync_result;
1376     socklen_t errlen = sizeof(sockerr);
1377     UNUSED(el);
1378     UNUSED(privdata);
1379     UNUSED(mask);
1380 
1381     /* If this event fired after the user turned the instance into a master
1382      * with SLAVEOF NO ONE we must just return ASAP. */
1383     if (server.repl_state == REPL_STATE_NONE) {
1384         close(fd);
1385         return;
1386     }
1387 
1388     /* Check for errors in the socket. */
1389     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1390         sockerr = errno;
1391     if (sockerr) {
1392         serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
1393             strerror(sockerr));
1394         goto error;
1395     }
1396 
1397     /* Send a PING to check the master is able to reply without errors. */
1398     if (server.repl_state == REPL_STATE_CONNECTING) {
1399         serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
1400         /* Delete the writable event so that the readable event remains
1401          * registered and we can wait for the PONG reply. */
1402         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
1403         server.repl_state = REPL_STATE_RECEIVE_PONG;
1404         /* Send the PING, don't check for errors at all, we have the timeout
1405          * that will take care about this. */
1406         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
1407         if (err) goto write_error;
1408         return;
1409     }
1410 
1411     /* Receive the PONG command. */
1412     if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
1413         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1414 
1415         /* We accept only two replies as valid, a positive +PONG reply
1416          * (we just check for "+") or an authentication error.
1417          * Note that older versions of Redis replied with "operation not
1418          * permitted" instead of using a proper error code, so we test
1419          * both. */
1420         if (err[0] != '+' &&
1421             strncmp(err,"-NOAUTH",7) != 0 &&
1422             strncmp(err,"-ERR operation not permitted",28) != 0)
1423         {
1424             serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
1425             sdsfree(err);
1426             goto error;
1427         } else {
1428             serverLog(LL_NOTICE,
1429                 "Master replied to PING, replication can continue...");
1430         }
1431         sdsfree(err);
1432         server.repl_state = REPL_STATE_SEND_AUTH;
1433     }
1434 
1435     /* AUTH with the master if required. */
1436     if (server.repl_state == REPL_STATE_SEND_AUTH) {
1437         if (server.masterauth) {
1438             err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
1439             if (err) goto write_error;
1440             server.repl_state = REPL_STATE_RECEIVE_AUTH;
1441             return;
1442         } else {
1443             server.repl_state = REPL_STATE_SEND_PORT;
1444         }
1445     }
1446 
1447     /* Receive AUTH reply. */
1448     if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
1449         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1450         if (err[0] == '-') {
1451             serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
1452             sdsfree(err);
1453             goto error;
1454         }
1455         sdsfree(err);
1456         server.repl_state = REPL_STATE_SEND_PORT;
1457     }
1458 
1459     /* Set the slave port, so that Master's INFO command can list the
1460      * slave listening port correctly. */
1461     if (server.repl_state == REPL_STATE_SEND_PORT) {
1462         sds port = sdsfromlonglong(server.port);
1463         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
1464                 "listening-port",port, NULL);
1465         sdsfree(port);
1466         if (err) goto write_error;
1467         sdsfree(err);
1468         server.repl_state = REPL_STATE_RECEIVE_PORT;
1469         return;
1470     }
1471 
1472     /* Receive REPLCONF listening-port reply. */
1473     if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
1474         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1475         /* Ignore the error if any, not all the Redis versions support
1476          * REPLCONF listening-port. */
1477         if (err[0] == '-') {
1478             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
1479                                 "REPLCONF listening-port: %s", err);
1480         }
1481         sdsfree(err);
1482         server.repl_state = REPL_STATE_SEND_CAPA;
1483     }
1484 
1485     /* Inform the master of our capabilities. While we currently send
1486      * just one capability, it is possible to chain new capabilities here
1487      * in the form of REPLCONF capa X capa Y capa Z ...
1488      * The master will ignore capabilities it does not understand. */
1489     if (server.repl_state == REPL_STATE_SEND_CAPA) {
1490         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
1491                 "capa","eof",NULL);
1492         if (err) goto write_error;
1493         sdsfree(err);
1494         server.repl_state = REPL_STATE_RECEIVE_CAPA;
1495         return;
1496     }
1497 
1498     /* Receive CAPA reply. */
1499     if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
1500         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1501         /* Ignore the error if any, not all the Redis versions support
1502          * REPLCONF capa. */
1503         if (err[0] == '-') {
1504             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
1505                                   "REPLCONF capa: %s", err);
1506         }
1507         sdsfree(err);
1508         server.repl_state = REPL_STATE_SEND_PSYNC;
1509     }
1510 
1511     /* Try a partial resynchonization. If we don't have a cached master
1512      * slaveTryPartialResynchronization() will at least try to use PSYNC
1513      * to start a full resynchronization so that we get the master run id
1514      * and the global offset, to try a partial resync at the next
1515      * reconnection attempt. */
1516     if (server.repl_state == REPL_STATE_SEND_PSYNC) {
1517         if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
1518             err = sdsnew("Write error sending the PSYNC command.");
1519             goto write_error;
1520         }
1521         server.repl_state = REPL_STATE_RECEIVE_PSYNC;
1522         return;
1523     }
1524 
1525     /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
1526     if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
1527         serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
1528                              "state should be RECEIVE_PSYNC but is %d",
1529                              server.repl_state);
1530         goto error;
1531     }
1532 
1533     psync_result = slaveTryPartialResynchronization(fd,1);
1534     if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
1535 
1536     /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
1537      * uninstalling the read handler from the file descriptor. */
1538 
1539     if (psync_result == PSYNC_CONTINUE) {
1540         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
1541         return;
1542     }
1543 
1544     /* PSYNC failed or is not supported: we want our slaves to resync with us
1545      * as well, if we have any (chained replication case). The mater may
1546      * transfer us an entirely different data set and we have no way to
1547      * incrementally feed our slaves after that. */
1548     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1549     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1550 
1551     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
1552      * and the server.repl_master_runid and repl_master_initial_offset are
1553      * already populated. */
1554     if (psync_result == PSYNC_NOT_SUPPORTED) {
1555         serverLog(LL_NOTICE,"Retrying with SYNC...");
1556         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
1557             serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
1558                 strerror(errno));
1559             goto error;
1560         }
1561     }
1562 
1563     /* Prepare a suitable temp file for bulk transfer */
1564     while(maxtries--) {
1565         snprintf(tmpfile,256,
1566             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1567         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1568         if (dfd != -1) break;
1569         sleep(1);
1570     }
1571     if (dfd == -1) {
1572         serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1573         goto error;
1574     }
1575 
1576     /* Setup the non blocking download of the bulk file. */
1577     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
1578             == AE_ERR)
1579     {
1580         serverLog(LL_WARNING,
1581             "Can't create readable event for SYNC: %s (fd=%d)",
1582             strerror(errno),fd);
1583         goto error;
1584     }
1585 
1586     server.repl_state = REPL_STATE_TRANSFER;
1587     server.repl_transfer_size = -1;
1588     server.repl_transfer_read = 0;
1589     server.repl_transfer_last_fsync_off = 0;
1590     server.repl_transfer_fd = dfd;
1591     server.repl_transfer_lastio = server.unixtime;
1592     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1593     return;
1594 
1595 error:
1596     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1597     close(fd);
1598     server.repl_transfer_s = -1;
1599     server.repl_state = REPL_STATE_CONNECT;
1600     return;
1601 
1602 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
1603     serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
1604     sdsfree(err);
1605     goto error;
1606 }
1607 
1608 int connectWithMaster(void) {
1609     int fd;
1610 
1611     fd = anetTcpNonBlockBestEffortBindConnect(NULL,
1612         server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
1613     if (fd == -1) {
1614         serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
1615             strerror(errno));
1616         return C_ERR;
1617     }
1618 
1619     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1620             AE_ERR)
1621     {
1622         close(fd);
1623         serverLog(LL_WARNING,"Can't create readable event for SYNC");
1624         return C_ERR;
1625     }
1626 
1627     server.repl_transfer_lastio = server.unixtime;
1628     server.repl_transfer_s = fd;
1629     server.repl_state = REPL_STATE_CONNECTING;
1630     return C_OK;
1631 }
1632 
1633 /* This function can be called when a non blocking connection is currently
1634  * in progress to undo it.
1635  * Never call this function directly, use cancelReplicationHandshake() instead.
1636  */
1637 void undoConnectWithMaster(void) {
1638     int fd = server.repl_transfer_s;
1639 
1640     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1641     close(fd);
1642     server.repl_transfer_s = -1;
1643 }
1644 
1645 /* Abort the async download of the bulk dataset while SYNC-ing with master.
1646  * Never call this function directly, use cancelReplicationHandshake() instead.
1647  */
1648 void replicationAbortSyncTransfer(void) {
1649     serverAssert(server.repl_state == REPL_STATE_TRANSFER);
1650     undoConnectWithMaster();
1651     close(server.repl_transfer_fd);
1652     unlink(server.repl_transfer_tmpfile);
1653     zfree(server.repl_transfer_tmpfile);
1654 }
1655 
1656 /* This function aborts a non blocking replication attempt if there is one
1657  * in progress, by canceling the non-blocking connect attempt or
1658  * the initial bulk transfer.
1659  *
1660  * If there was a replication handshake in progress 1 is returned and
1661  * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
1662  *
1663  * Otherwise zero is returned and no operation is perforemd at all. */
1664 int cancelReplicationHandshake(void) {
1665     if (server.repl_state == REPL_STATE_TRANSFER) {
1666         replicationAbortSyncTransfer();
1667         server.repl_state = REPL_STATE_CONNECT;
1668     } else if (server.repl_state == REPL_STATE_CONNECTING ||
1669                slaveIsInHandshakeState())
1670     {
1671         undoConnectWithMaster();
1672         server.repl_state = REPL_STATE_CONNECT;
1673     } else {
1674         return 0;
1675     }
1676     return 1;
1677 }
1678 
1679 /* Set replication to the specified master address and port. */
1680 void replicationSetMaster(char *ip, int port) {
1681     sdsfree(server.masterhost);
1682     server.masterhost = sdsnew(ip);
1683     server.masterport = port;
1684     if (server.master) freeClient(server.master);
1685     disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
1686     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1687     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
1688     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1689     cancelReplicationHandshake();
1690     server.repl_state = REPL_STATE_CONNECT;
1691     server.master_repl_offset = 0;
1692     server.repl_down_since = 0;
1693 }
1694 
1695 /* Cancel replication, setting the instance as a master itself. */
1696 void replicationUnsetMaster(void) {
1697     if (server.masterhost == NULL) return; /* Nothing to do. */
1698     sdsfree(server.masterhost);
1699     server.masterhost = NULL;
1700     if (server.master) {
1701         if (listLength(server.slaves) == 0) {
1702             /* If this instance is turned into a master and there are no
1703              * slaves, it inherits the replication offset from the master.
1704              * Under certain conditions this makes replicas comparable by
1705              * replication offset to understand what is the most updated. */
1706             server.master_repl_offset = server.master->reploff;
1707             freeReplicationBacklog();
1708         }
1709         freeClient(server.master);
1710     }
1711     replicationDiscardCachedMaster();
1712     cancelReplicationHandshake();
1713     server.repl_state = REPL_STATE_NONE;
1714 }
1715 
1716 /* This function is called when the slave lose the connection with the
1717  * master into an unexpected way. */
1718 void replicationHandleMasterDisconnection(void) {
1719     server.master = NULL;
1720     server.repl_state = REPL_STATE_CONNECT;
1721     server.repl_down_since = server.unixtime;
1722     /* We lost connection with our master, don't disconnect slaves yet,
1723      * maybe we'll be able to PSYNC with our master later. We'll disconnect
1724      * the slaves only if we'll have to do a full resync with our master. */
1725 }
1726 
1727 void slaveofCommand(client *c) {
1728     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1729      * configured using the current address of the master node. */
1730     if (server.cluster_enabled) {
1731         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1732         return;
1733     }
1734 
1735     /* The special host/port combination "NO" "ONE" turns the instance
1736      * into a master. Otherwise the new master address is set. */
1737     if (!strcasecmp(c->argv[1]->ptr,"no") &&
1738         !strcasecmp(c->argv[2]->ptr,"one")) {
1739         if (server.masterhost) {
1740             replicationUnsetMaster();
1741             sds client = catClientInfoString(sdsempty(),c);
1742             serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
1743                 client);
1744             sdsfree(client);
1745         }
1746     } else {
1747         long port;
1748 
1749         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
1750             return;
1751 
1752         /* Check if we are already attached to the specified slave */
1753         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1754             && server.masterport == port) {
1755             serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1756             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1757             return;
1758         }
1759         /* There was no previous master or the user specified a different one,
1760          * we can continue. */
1761         replicationSetMaster(c->argv[1]->ptr, port);
1762         sds client = catClientInfoString(sdsempty(),c);
1763         serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
1764             server.masterhost, server.masterport, client);
1765         sdsfree(client);
1766     }
1767     addReply(c,shared.ok);
1768 }
1769 
1770 /* ROLE command: provide information about the role of the instance
1771  * (master or slave) and additional information related to replication
1772  * in an easy to process format. */
1773 void roleCommand(client *c) {
1774     if (server.masterhost == NULL) {
1775         listIter li;
1776         listNode *ln;
1777         void *mbcount;
1778         int slaves = 0;
1779 
1780         addReplyMultiBulkLen(c,3);
1781         addReplyBulkCBuffer(c,"master",6);
1782         addReplyLongLong(c,server.master_repl_offset);
1783         mbcount = addDeferredMultiBulkLength(c);
1784         listRewind(server.slaves,&li);
1785         while((ln = listNext(&li))) {
1786             client *slave = ln->value;
1787             char ip[NET_IP_STR_LEN];
1788 
1789             if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue;
1790             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
1791             addReplyMultiBulkLen(c,3);
1792             addReplyBulkCString(c,ip);
1793             addReplyBulkLongLong(c,slave->slave_listening_port);
1794             addReplyBulkLongLong(c,slave->repl_ack_off);
1795             slaves++;
1796         }
1797         setDeferredMultiBulkLength(c,mbcount,slaves);
1798     } else {
1799         char *slavestate = NULL;
1800 
1801         addReplyMultiBulkLen(c,5);
1802         addReplyBulkCBuffer(c,"slave",5);
1803         addReplyBulkCString(c,server.masterhost);
1804         addReplyLongLong(c,server.masterport);
1805         if (slaveIsInHandshakeState()) {
1806             slavestate = "handshake";
1807         } else {
1808             switch(server.repl_state) {
1809             case REPL_STATE_NONE: slavestate = "none"; break;
1810             case REPL_STATE_CONNECT: slavestate = "connect"; break;
1811             case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
1812             case REPL_STATE_TRANSFER: slavestate = "sync"; break;
1813             case REPL_STATE_CONNECTED: slavestate = "connected"; break;
1814             default: slavestate = "unknown"; break;
1815             }
1816         }
1817         addReplyBulkCString(c,slavestate);
1818         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1819     }
1820 }
1821 
1822 /* Send a REPLCONF ACK command to the master to inform it about the current
1823  * processed offset. If we are not connected with a master, the command has
1824  * no effects. */
1825 void replicationSendAck(void) {
1826     client *c = server.master;
1827 
1828     if (c != NULL) {
1829         c->flags |= CLIENT_MASTER_FORCE_REPLY;
1830         addReplyMultiBulkLen(c,3);
1831         addReplyBulkCString(c,"REPLCONF");
1832         addReplyBulkCString(c,"ACK");
1833         addReplyBulkLongLong(c,c->reploff);
1834         c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
1835     }
1836 }
1837 
1838 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
1839 
1840 /* In order to implement partial synchronization we need to be able to cache
1841  * our master's client structure after a transient disconnection.
1842  * It is cached into server.cached_master and flushed away using the following
1843  * functions. */
1844 
1845 /* This function is called by freeClient() in order to cache the master
1846  * client structure instead of destryoing it. freeClient() will return
1847  * ASAP after this function returns, so every action needed to avoid problems
1848  * with a client that is really "suspended" has to be done by this function.
1849  *
1850  * The other functions that will deal with the cached master are:
1851  *
1852  * replicationDiscardCachedMaster() that will make sure to kill the client
1853  * as for some reason we don't want to use it in the future.
1854  *
1855  * replicationResurrectCachedMaster() that is used after a successful PSYNC
1856  * handshake in order to reactivate the cached master.
1857  */
1858 void replicationCacheMaster(client *c) {
1859     serverAssert(server.master != NULL && server.cached_master == NULL);
1860     serverLog(LL_NOTICE,"Caching the disconnected master state.");
1861 
1862     /* Unlink the client from the server structures. */
1863     unlinkClient(c);
1864 
1865     /* Save the master. Server.master will be set to null later by
1866      * replicationHandleMasterDisconnection(). */
1867     server.cached_master = server.master;
1868 
1869     /* Invalidate the Peer ID cache. */
1870     if (c->peerid) {
1871         sdsfree(c->peerid);
1872         c->peerid = NULL;
1873     }
1874 
1875     /* Caching the master happens instead of the actual freeClient() call,
1876      * so make sure to adjust the replication state. This function will
1877      * also set server.master to NULL. */
1878     replicationHandleMasterDisconnection();
1879 }
1880 
1881 /* Free a cached master, called when there are no longer the conditions for
1882  * a partial resync on reconnection. */
1883 void replicationDiscardCachedMaster(void) {
1884     if (server.cached_master == NULL) return;
1885 
1886     serverLog(LL_NOTICE,"Discarding previously cached master state.");
1887     server.cached_master->flags &= ~CLIENT_MASTER;
1888     freeClient(server.cached_master);
1889     server.cached_master = NULL;
1890 }
1891 
1892 /* Turn the cached master into the current master, using the file descriptor
1893  * passed as argument as the socket for the new master.
1894  *
1895  * This function is called when successfully setup a partial resynchronization
1896  * so the stream of data that we'll receive will start from were this
1897  * master left. */
1898 void replicationResurrectCachedMaster(int newfd) {
1899     server.master = server.cached_master;
1900     server.cached_master = NULL;
1901     server.master->fd = newfd;
1902     server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
1903     server.master->authenticated = 1;
1904     server.master->lastinteraction = server.unixtime;
1905     server.repl_state = REPL_STATE_CONNECTED;
1906 
1907     /* Re-add to the list of clients. */
1908     listAddNodeTail(server.clients,server.master);
1909     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
1910                           readQueryFromClient, server.master)) {
1911         serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
1912         freeClientAsync(server.master); /* Close ASAP. */
1913     }
1914 
1915     /* We may also need to install the write handler as well if there is
1916      * pending data in the write buffers. */
1917     if (clientHasPendingReplies(server.master)) {
1918         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
1919                           sendReplyToClient, server.master)) {
1920             serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
1921             freeClientAsync(server.master); /* Close ASAP. */
1922         }
1923     }
1924 }
1925 
1926 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1927 
1928 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1929  * If the option is active, the server will prevent writes if there are not
1930  * enough connected slaves with the specified lag (or less). */
1931 void refreshGoodSlavesCount(void) {
1932     listIter li;
1933     listNode *ln;
1934     int good = 0;
1935 
1936     if (!server.repl_min_slaves_to_write ||
1937         !server.repl_min_slaves_max_lag) return;
1938 
1939     listRewind(server.slaves,&li);
1940     while((ln = listNext(&li))) {
1941         client *slave = ln->value;
1942         time_t lag = server.unixtime - slave->repl_ack_time;
1943 
1944         if (slave->replstate == SLAVE_STATE_ONLINE &&
1945             lag <= server.repl_min_slaves_max_lag) good++;
1946     }
1947     server.repl_good_slaves_count = good;
1948 }
1949 
1950 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
1951  * The goal of this code is to keep track of scripts already sent to every
1952  * connected slave, in order to be able to replicate EVALSHA as it is without
1953  * translating it to EVAL every time it is possible.
1954  *
1955  * We use a capped collection implemented by a hash table for fast lookup
1956  * of scripts we can send as EVALSHA, plus a linked list that is used for
1957  * eviction of the oldest entry when the max number of items is reached.
1958  *
1959  * We don't care about taking a different cache for every different slave
1960  * since to fill the cache again is not very costly, the goal of this code
1961  * is to avoid that the same big script is trasmitted a big number of times
1962  * per second wasting bandwidth and processor speed, but it is not a problem
1963  * if we need to rebuild the cache from scratch from time to time, every used
1964  * script will need to be transmitted a single time to reappear in the cache.
1965  *
1966  * This is how the system works:
1967  *
1968  * 1) Every time a new slave connects, we flush the whole script cache.
1969  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
1970  *    trying to convert EVAL into EVALSHA specifically for slaves.
1971  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
1972  *    corresponding SHA1 of the script into the cache as we are sure every
1973  *    slave knows about the script starting from now.
1974  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
1975  *    and at the same time flush the script cache.
1976  * 5) When the last slave disconnects, flush the cache.
1977  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
1978  *    in the master sometimes.
1979  */
1980 
1981 /* Initialize the script cache, only called at startup. */
1982 void replicationScriptCacheInit(void) {
1983     server.repl_scriptcache_size = 10000;
1984     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
1985     server.repl_scriptcache_fifo = listCreate();
1986 }
1987 
1988 /* Empty the script cache. Should be called every time we are no longer sure
1989  * that every slave knows about all the scripts in our set, or when the
1990  * current AOF "context" is no longer aware of the script. In general we
1991  * should flush the cache:
1992  *
1993  * 1) Every time a new slave reconnects to this master and performs a
1994  *    full SYNC (PSYNC does not require flushing).
1995  * 2) Every time an AOF rewrite is performed.
1996  * 3) Every time we are left without slaves at all, and AOF is off, in order
1997  *    to reclaim otherwise unused memory.
1998  */
1999 void replicationScriptCacheFlush(void) {
2000     dictEmpty(server.repl_scriptcache_dict,NULL);
2001     listRelease(server.repl_scriptcache_fifo);
2002     server.repl_scriptcache_fifo = listCreate();
2003 }
2004 
2005 /* Add an entry into the script cache, if we reach max number of entries the
2006  * oldest is removed from the list. */
2007 void replicationScriptCacheAdd(sds sha1) {
2008     int retval;
2009     sds key = sdsdup(sha1);
2010 
2011     /* Evict oldest. */
2012     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
2013     {
2014         listNode *ln = listLast(server.repl_scriptcache_fifo);
2015         sds oldest = listNodeValue(ln);
2016 
2017         retval = dictDelete(server.repl_scriptcache_dict,oldest);
2018         serverAssert(retval == DICT_OK);
2019         listDelNode(server.repl_scriptcache_fifo,ln);
2020     }
2021 
2022     /* Add current. */
2023     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
2024     listAddNodeHead(server.repl_scriptcache_fifo,key);
2025     serverAssert(retval == DICT_OK);
2026 }
2027 
2028 /* Returns non-zero if the specified entry exists inside the cache, that is,
2029  * if all the slaves are aware of this script SHA1. */
2030 int replicationScriptCacheExists(sds sha1) {
2031     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
2032 }
2033 
2034 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
2035  * Redis synchronous replication design can be summarized in points:
2036  *
2037  * - Redis masters have a global replication offset, used by PSYNC.
2038  * - Master increment the offset every time new commands are sent to slaves.
2039  * - Slaves ping back masters with the offset processed so far.
2040  *
2041  * So synchronous replication adds a new WAIT command in the form:
2042  *
2043  *   WAIT <num_replicas> <milliseconds_timeout>
2044  *
2045  * That returns the number of replicas that processed the query when
2046  * we finally have at least num_replicas, or when the timeout was
2047  * reached.
2048  *
2049  * The command is implemented in this way:
2050  *
2051  * - Every time a client processes a command, we remember the replication
2052  *   offset after sending that command to the slaves.
2053  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
2054  *   The client is blocked at the same time (see blocked.c).
2055  * - Once we receive enough ACKs for a given offset or when the timeout
2056  *   is reached, the WAIT command is unblocked and the reply sent to the
2057  *   client.
2058  */
2059 
2060 /* This just set a flag so that we broadcast a REPLCONF GETACK command
2061  * to all the slaves in the beforeSleep() function. Note that this way
2062  * we "group" all the clients that want to wait for synchronouns replication
2063  * in a given event loop iteration, and send a single GETACK for them all. */
2064 void replicationRequestAckFromSlaves(void) {
2065     server.get_ack_from_slaves = 1;
2066 }
2067 
2068 /* Return the number of slaves that already acknowledged the specified
2069  * replication offset. */
2070 int replicationCountAcksByOffset(long long offset) {
2071     listIter li;
2072     listNode *ln;
2073     int count = 0;
2074 
2075     listRewind(server.slaves,&li);
2076     while((ln = listNext(&li))) {
2077         client *slave = ln->value;
2078 
2079         if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2080         if (slave->repl_ack_off >= offset) count++;
2081     }
2082     return count;
2083 }
2084 
2085 /* WAIT for N replicas to acknowledge the processing of our latest
2086  * write command (and all the previous commands). */
2087 void waitCommand(client *c) {
2088     mstime_t timeout;
2089     long numreplicas, ackreplicas;
2090     long long offset = c->woff;
2091 
2092     /* Argument parsing. */
2093     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
2094         return;
2095     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
2096         != C_OK) return;
2097 
2098     /* First try without blocking at all. */
2099     ackreplicas = replicationCountAcksByOffset(c->woff);
2100     if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
2101         addReplyLongLong(c,ackreplicas);
2102         return;
2103     }
2104 
2105     /* Otherwise block the client and put it into our list of clients
2106      * waiting for ack from slaves. */
2107     c->bpop.timeout = timeout;
2108     c->bpop.reploffset = offset;
2109     c->bpop.numreplicas = numreplicas;
2110     listAddNodeTail(server.clients_waiting_acks,c);
2111     blockClient(c,BLOCKED_WAIT);
2112 
2113     /* Make sure that the server will send an ACK request to all the slaves
2114      * before returning to the event loop. */
2115     replicationRequestAckFromSlaves();
2116 }
2117 
2118 /* This is called by unblockClient() to perform the blocking op type
2119  * specific cleanup. We just remove the client from the list of clients
2120  * waiting for replica acks. Never call it directly, call unblockClient()
2121  * instead. */
2122 void unblockClientWaitingReplicas(client *c) {
2123     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
2124     serverAssert(ln != NULL);
2125     listDelNode(server.clients_waiting_acks,ln);
2126 }
2127 
2128 /* Check if there are clients blocked in WAIT that can be unblocked since
2129  * we received enough ACKs from slaves. */
2130 void processClientsWaitingReplicas(void) {
2131     long long last_offset = 0;
2132     int last_numreplicas = 0;
2133 
2134     listIter li;
2135     listNode *ln;
2136 
2137     listRewind(server.clients_waiting_acks,&li);
2138     while((ln = listNext(&li))) {
2139         client *c = ln->value;
2140 
2141         /* Every time we find a client that is satisfied for a given
2142          * offset and number of replicas, we remember it so the next client
2143          * may be unblocked without calling replicationCountAcksByOffset()
2144          * if the requested offset / replicas were equal or less. */
2145         if (last_offset && last_offset > c->bpop.reploffset &&
2146                            last_numreplicas > c->bpop.numreplicas)
2147         {
2148             unblockClient(c);
2149             addReplyLongLong(c,last_numreplicas);
2150         } else {
2151             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
2152 
2153             if (numreplicas >= c->bpop.numreplicas) {
2154                 last_offset = c->bpop.reploffset;
2155                 last_numreplicas = numreplicas;
2156                 unblockClient(c);
2157                 addReplyLongLong(c,numreplicas);
2158             }
2159         }
2160     }
2161 }
2162 
2163 /* Return the slave replication offset for this instance, that is
2164  * the offset for which we already processed the master replication stream. */
2165 long long replicationGetSlaveOffset(void) {
2166     long long offset = 0;
2167 
2168     if (server.masterhost != NULL) {
2169         if (server.master) {
2170             offset = server.master->reploff;
2171         } else if (server.cached_master) {
2172             offset = server.cached_master->reploff;
2173         }
2174     }
2175     /* offset may be -1 when the master does not support it at all, however
2176      * this function is designed to return an offset that can express the
2177      * amount of data processed by the master, so we return a positive
2178      * integer. */
2179     if (offset < 0) offset = 0;
2180     return offset;
2181 }
2182 
2183 /* --------------------------- REPLICATION CRON  ---------------------------- */
2184 
2185 /* Replication cron function, called 1 time per second. */
2186 void replicationCron(void) {
2187     static long long replication_cron_loops = 0;
2188 
2189     /* Non blocking connection timeout? */
2190     if (server.masterhost &&
2191         (server.repl_state == REPL_STATE_CONNECTING ||
2192          slaveIsInHandshakeState()) &&
2193          (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
2194     {
2195         serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
2196         cancelReplicationHandshake();
2197     }
2198 
2199     /* Bulk transfer I/O timeout? */
2200     if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
2201         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
2202     {
2203         serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
2204         cancelReplicationHandshake();
2205     }
2206 
2207     /* Timed out master when we are an already connected slave? */
2208     if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
2209         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
2210     {
2211         serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
2212         freeClient(server.master);
2213     }
2214 
2215     /* Check if we should connect to a MASTER */
2216     if (server.repl_state == REPL_STATE_CONNECT) {
2217         serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
2218             server.masterhost, server.masterport);
2219         if (connectWithMaster() == C_OK) {
2220             serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
2221         }
2222     }
2223 
2224     /* Send ACK to master from time to time.
2225      * Note that we do not send periodic acks to masters that don't
2226      * support PSYNC and replication offsets. */
2227     if (server.masterhost && server.master &&
2228         !(server.master->flags & CLIENT_PRE_PSYNC))
2229         replicationSendAck();
2230 
2231     /* If we have attached slaves, PING them from time to time.
2232      * So slaves can implement an explicit timeout to masters, and will
2233      * be able to detect a link disconnection even if the TCP connection
2234      * will not actually go down. */
2235     listIter li;
2236     listNode *ln;
2237     robj *ping_argv[1];
2238 
2239     /* First, send PING according to ping_slave_period. */
2240     if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
2241         ping_argv[0] = createStringObject("PING",4);
2242         replicationFeedSlaves(server.slaves, server.slaveseldb,
2243             ping_argv, 1);
2244         decrRefCount(ping_argv[0]);
2245     }
2246 
2247     /* Second, send a newline to all the slaves in pre-synchronization
2248      * stage, that is, slaves waiting for the master to create the RDB file.
2249      * The newline will be ignored by the slave but will refresh the
2250      * last-io timer preventing a timeout. In this case we ignore the
2251      * ping period and refresh the connection once per second since certain
2252      * timeouts are set at a few seconds (example: PSYNC response). */
2253     listRewind(server.slaves,&li);
2254     while((ln = listNext(&li))) {
2255         client *slave = ln->value;
2256 
2257         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
2258             (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
2259              server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
2260         {
2261             if (write(slave->fd, "\n", 1) == -1) {
2262                 /* Don't worry, it's just a ping. */
2263             }
2264         }
2265     }
2266 
2267     /* Disconnect timedout slaves. */
2268     if (listLength(server.slaves)) {
2269         listIter li;
2270         listNode *ln;
2271 
2272         listRewind(server.slaves,&li);
2273         while((ln = listNext(&li))) {
2274             client *slave = ln->value;
2275 
2276             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2277             if (slave->flags & CLIENT_PRE_PSYNC) continue;
2278             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
2279             {
2280                 serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
2281                     replicationGetSlaveName(slave));
2282                 freeClient(slave);
2283             }
2284         }
2285     }
2286 
2287     /* If we have no attached slaves and there is a replication backlog
2288      * using memory, free it after some (configured) time. */
2289     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
2290         server.repl_backlog)
2291     {
2292         time_t idle = server.unixtime - server.repl_no_slaves_since;
2293 
2294         if (idle > server.repl_backlog_time_limit) {
2295             freeReplicationBacklog();
2296             serverLog(LL_NOTICE,
2297                 "Replication backlog freed after %d seconds "
2298                 "without connected slaves.",
2299                 (int) server.repl_backlog_time_limit);
2300         }
2301     }
2302 
2303     /* If AOF is disabled and we no longer have attached slaves, we can
2304      * free our Replication Script Cache as there is no need to propagate
2305      * EVALSHA at all. */
2306     if (listLength(server.slaves) == 0 &&
2307         server.aof_state == AOF_OFF &&
2308         listLength(server.repl_scriptcache_fifo) != 0)
2309     {
2310         replicationScriptCacheFlush();
2311     }
2312 
2313     /* Start a BGSAVE good for replication if we have slaves in
2314      * WAIT_BGSAVE_START state.
2315      *
2316      * In case of diskless replication, we make sure to wait the specified
2317      * number of seconds (according to configuration) so that other slaves
2318      * have the time to arrive before we start streaming. */
2319     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2320         time_t idle, max_idle = 0;
2321         int slaves_waiting = 0;
2322         int mincapa = -1;
2323         listNode *ln;
2324         listIter li;
2325 
2326         listRewind(server.slaves,&li);
2327         while((ln = listNext(&li))) {
2328             client *slave = ln->value;
2329             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
2330                 idle = server.unixtime - slave->lastinteraction;
2331                 if (idle > max_idle) max_idle = idle;
2332                 slaves_waiting++;
2333                 mincapa = (mincapa == -1) ? slave->slave_capa :
2334                                             (mincapa & slave->slave_capa);
2335             }
2336         }
2337 
2338         if (slaves_waiting &&
2339             (!server.repl_diskless_sync ||
2340              max_idle > server.repl_diskless_sync_delay))
2341         {
2342             /* Start the BGSAVE. The called function may start a
2343              * BGSAVE with socket target or disk target depending on the
2344              * configuration and slaves capabilities. */
2345             startBgsaveForReplication(mincapa);
2346         }
2347     }
2348 
2349     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2350     refreshGoodSlavesCount();
2351     replication_cron_loops++; /* Incremented with frequency 1 HZ. */
2352 }
2353