xref: /redis-3.2.3/src/replication.c (revision dffbbb5a)
1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "redis.h"
33 
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 
40 void replicationDiscardCachedMaster(void);
41 void replicationResurrectCachedMaster(int newfd);
42 void replicationSendAck(void);
43 void putSlaveOnline(redisClient *slave);
44 
45 /* --------------------------- Utility functions ---------------------------- */
46 
47 /* Return the pointer to a string representing the slave ip:listening_port
48  * pair. Mostly useful for logging, since we want to log a slave using its
49  * IP address and it's listening port which is more clear for the user, for
50  * example: "Closing connection with slave 10.1.2.3:6380". */
51 char *replicationGetSlaveName(redisClient *c) {
52     static char buf[REDIS_PEER_ID_LEN];
53     char ip[REDIS_IP_STR_LEN];
54 
55     ip[0] = '\0';
56     buf[0] = '\0';
57     if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
58         if (c->slave_listening_port)
59             anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
60         else
61             snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
62     } else {
63         snprintf(buf,sizeof(buf),"client id #%llu",
64             (unsigned long long) c->id);
65     }
66     return buf;
67 }
68 
69 /* ---------------------------------- MASTER -------------------------------- */
70 
71 void createReplicationBacklog(void) {
72     redisAssert(server.repl_backlog == NULL);
73     server.repl_backlog = zmalloc(server.repl_backlog_size);
74     server.repl_backlog_histlen = 0;
75     server.repl_backlog_idx = 0;
76     /* When a new backlog buffer is created, we increment the replication
77      * offset by one to make sure we'll not be able to PSYNC with any
78      * previous slave. This is needed because we avoid incrementing the
79      * master_repl_offset if no backlog exists nor slaves are attached. */
80     server.master_repl_offset++;
81 
82     /* We don't have any data inside our buffer, but virtually the first
83      * byte we have is the next byte that will be generated for the
84      * replication stream. */
85     server.repl_backlog_off = server.master_repl_offset+1;
86 }
87 
88 /* This function is called when the user modifies the replication backlog
89  * size at runtime. It is up to the function to both update the
90  * server.repl_backlog_size and to resize the buffer and setup it so that
91  * it contains the same data as the previous one (possibly less data, but
92  * the most recent bytes, or the same data and more free space in case the
93  * buffer is enlarged). */
94 void resizeReplicationBacklog(long long newsize) {
95     if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
96         newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
97     if (server.repl_backlog_size == newsize) return;
98 
99     server.repl_backlog_size = newsize;
100     if (server.repl_backlog != NULL) {
101         /* What we actually do is to flush the old buffer and realloc a new
102          * empty one. It will refill with new data incrementally.
103          * The reason is that copying a few gigabytes adds latency and even
104          * worse often we need to alloc additional space before freeing the
105          * old buffer. */
106         zfree(server.repl_backlog);
107         server.repl_backlog = zmalloc(server.repl_backlog_size);
108         server.repl_backlog_histlen = 0;
109         server.repl_backlog_idx = 0;
110         /* Next byte we have is... the next since the buffer is empty. */
111         server.repl_backlog_off = server.master_repl_offset+1;
112     }
113 }
114 
115 void freeReplicationBacklog(void) {
116     redisAssert(listLength(server.slaves) == 0);
117     zfree(server.repl_backlog);
118     server.repl_backlog = NULL;
119 }
120 
121 /* Add data to the replication backlog.
122  * This function also increments the global replication offset stored at
123  * server.master_repl_offset, because there is no case where we want to feed
124  * the backlog without incrementing the buffer. */
125 void feedReplicationBacklog(void *ptr, size_t len) {
126     unsigned char *p = ptr;
127 
128     server.master_repl_offset += len;
129 
130     /* This is a circular buffer, so write as much data we can at every
131      * iteration and rewind the "idx" index if we reach the limit. */
132     while(len) {
133         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
134         if (thislen > len) thislen = len;
135         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
136         server.repl_backlog_idx += thislen;
137         if (server.repl_backlog_idx == server.repl_backlog_size)
138             server.repl_backlog_idx = 0;
139         len -= thislen;
140         p += thislen;
141         server.repl_backlog_histlen += thislen;
142     }
143     if (server.repl_backlog_histlen > server.repl_backlog_size)
144         server.repl_backlog_histlen = server.repl_backlog_size;
145     /* Set the offset of the first byte we have in the backlog. */
146     server.repl_backlog_off = server.master_repl_offset -
147                               server.repl_backlog_histlen + 1;
148 }
149 
150 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
151  * as input. */
152 void feedReplicationBacklogWithObject(robj *o) {
153     char llstr[REDIS_LONGSTR_SIZE];
154     void *p;
155     size_t len;
156 
157     if (o->encoding == REDIS_ENCODING_INT) {
158         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
159         p = llstr;
160     } else {
161         len = sdslen(o->ptr);
162         p = o->ptr;
163     }
164     feedReplicationBacklog(p,len);
165 }
166 
167 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
168     listNode *ln;
169     listIter li;
170     int j, len;
171     char llstr[REDIS_LONGSTR_SIZE];
172 
173     /* If there aren't slaves, and there is no backlog buffer to populate,
174      * we can return ASAP. */
175     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
176 
177     /* We can't have slaves attached and no backlog. */
178     redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
179 
180     /* Send SELECT command to every slave if needed. */
181     if (server.slaveseldb != dictid) {
182         robj *selectcmd;
183 
184         /* For a few DBs we have pre-computed SELECT command. */
185         if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
186             selectcmd = shared.select[dictid];
187         } else {
188             int dictid_len;
189 
190             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
191             selectcmd = createObject(REDIS_STRING,
192                 sdscatprintf(sdsempty(),
193                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
194                 dictid_len, llstr));
195         }
196 
197         /* Add the SELECT command into the backlog. */
198         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
199 
200         /* Send it to slaves. */
201         listRewind(slaves,&li);
202         while((ln = listNext(&li))) {
203             redisClient *slave = ln->value;
204             addReply(slave,selectcmd);
205         }
206 
207         if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
208             decrRefCount(selectcmd);
209     }
210     server.slaveseldb = dictid;
211 
212     /* Write the command to the replication backlog if any. */
213     if (server.repl_backlog) {
214         char aux[REDIS_LONGSTR_SIZE+3];
215 
216         /* Add the multi bulk reply length. */
217         aux[0] = '*';
218         len = ll2string(aux+1,sizeof(aux)-1,argc);
219         aux[len+1] = '\r';
220         aux[len+2] = '\n';
221         feedReplicationBacklog(aux,len+3);
222 
223         for (j = 0; j < argc; j++) {
224             long objlen = stringObjectLen(argv[j]);
225 
226             /* We need to feed the buffer with the object as a bulk reply
227              * not just as a plain string, so create the $..CRLF payload len
228              * and add the final CRLF */
229             aux[0] = '$';
230             len = ll2string(aux+1,sizeof(aux)-1,objlen);
231             aux[len+1] = '\r';
232             aux[len+2] = '\n';
233             feedReplicationBacklog(aux,len+3);
234             feedReplicationBacklogWithObject(argv[j]);
235             feedReplicationBacklog(aux+len+1,2);
236         }
237     }
238 
239     /* Write the command to every slave. */
240     listRewind(server.slaves,&li);
241     while((ln = listNext(&li))) {
242         redisClient *slave = ln->value;
243 
244         /* Don't feed slaves that are still waiting for BGSAVE to start */
245         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
246 
247         /* Feed slaves that are waiting for the initial SYNC (so these commands
248          * are queued in the output buffer until the initial SYNC completes),
249          * or are already in sync with the master. */
250 
251         /* Add the multi bulk length. */
252         addReplyMultiBulkLen(slave,argc);
253 
254         /* Finally any additional argument that was not stored inside the
255          * static buffer if any (from j to argc). */
256         for (j = 0; j < argc; j++)
257             addReplyBulk(slave,argv[j]);
258     }
259 }
260 
261 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
262     listNode *ln;
263     listIter li;
264     int j;
265     sds cmdrepr = sdsnew("+");
266     robj *cmdobj;
267     struct timeval tv;
268 
269     gettimeofday(&tv,NULL);
270     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
271     if (c->flags & REDIS_LUA_CLIENT) {
272         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
273     } else if (c->flags & REDIS_UNIX_SOCKET) {
274         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
275     } else {
276         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
277     }
278 
279     for (j = 0; j < argc; j++) {
280         if (argv[j]->encoding == REDIS_ENCODING_INT) {
281             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
282         } else {
283             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
284                         sdslen(argv[j]->ptr));
285         }
286         if (j != argc-1)
287             cmdrepr = sdscatlen(cmdrepr," ",1);
288     }
289     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
290     cmdobj = createObject(REDIS_STRING,cmdrepr);
291 
292     listRewind(monitors,&li);
293     while((ln = listNext(&li))) {
294         redisClient *monitor = ln->value;
295         addReply(monitor,cmdobj);
296     }
297     decrRefCount(cmdobj);
298 }
299 
300 /* Feed the slave 'c' with the replication backlog starting from the
301  * specified 'offset' up to the end of the backlog. */
302 long long addReplyReplicationBacklog(redisClient *c, long long offset) {
303     long long j, skip, len;
304 
305     redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
306 
307     if (server.repl_backlog_histlen == 0) {
308         redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
309         return 0;
310     }
311 
312     redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
313              server.repl_backlog_size);
314     redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
315              server.repl_backlog_off);
316     redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
317              server.repl_backlog_histlen);
318     redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
319              server.repl_backlog_idx);
320 
321     /* Compute the amount of bytes we need to discard. */
322     skip = offset - server.repl_backlog_off;
323     redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
324 
325     /* Point j to the oldest byte, that is actaully our
326      * server.repl_backlog_off byte. */
327     j = (server.repl_backlog_idx +
328         (server.repl_backlog_size-server.repl_backlog_histlen)) %
329         server.repl_backlog_size;
330     redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
331 
332     /* Discard the amount of data to seek to the specified 'offset'. */
333     j = (j + skip) % server.repl_backlog_size;
334 
335     /* Feed slave with data. Since it is a circular buffer we have to
336      * split the reply in two parts if we are cross-boundary. */
337     len = server.repl_backlog_histlen - skip;
338     redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
339     while(len) {
340         long long thislen =
341             ((server.repl_backlog_size - j) < len) ?
342             (server.repl_backlog_size - j) : len;
343 
344         redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
345         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
346         len -= thislen;
347         j = 0;
348     }
349     return server.repl_backlog_histlen - skip;
350 }
351 
352 /* This function handles the PSYNC command from the point of view of a
353  * master receiving a request for partial resynchronization.
354  *
355  * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
356  * with the usual full resync. */
357 int masterTryPartialResynchronization(redisClient *c) {
358     long long psync_offset, psync_len;
359     char *master_runid = c->argv[1]->ptr;
360     char buf[128];
361     int buflen;
362 
363     /* Is the runid of this master the same advertised by the wannabe slave
364      * via PSYNC? If runid changed this master is a different instance and
365      * there is no way to continue. */
366     if (strcasecmp(master_runid, server.runid)) {
367         /* Run id "?" is used by slaves that want to force a full resync. */
368         if (master_runid[0] != '?') {
369             redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
370                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
371                 master_runid, server.runid);
372         } else {
373             redisLog(REDIS_NOTICE,"Full resync requested by slave %s",
374                 replicationGetSlaveName(c));
375         }
376         goto need_full_resync;
377     }
378 
379     /* We still have the data our slave is asking for? */
380     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
381        REDIS_OK) goto need_full_resync;
382     if (!server.repl_backlog ||
383         psync_offset < server.repl_backlog_off ||
384         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
385     {
386         redisLog(REDIS_NOTICE,
387             "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
388         if (psync_offset > server.master_repl_offset) {
389             redisLog(REDIS_WARNING,
390                 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
391         }
392         goto need_full_resync;
393     }
394 
395     /* If we reached this point, we are able to perform a partial resync:
396      * 1) Set client state to make it a slave.
397      * 2) Inform the client we can continue with +CONTINUE
398      * 3) Send the backlog data (from the offset to the end) to the slave. */
399     c->flags |= REDIS_SLAVE;
400     c->replstate = REDIS_REPL_ONLINE;
401     c->repl_ack_time = server.unixtime;
402     c->repl_put_online_on_ack = 0;
403     listAddNodeTail(server.slaves,c);
404     /* We can't use the connection buffers since they are used to accumulate
405      * new commands at this stage. But we are sure the socket send buffer is
406      * empty so this write will never fail actually. */
407     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
408     if (write(c->fd,buf,buflen) != buflen) {
409         freeClientAsync(c);
410         return REDIS_OK;
411     }
412     psync_len = addReplyReplicationBacklog(c,psync_offset);
413     redisLog(REDIS_NOTICE,
414         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
415             replicationGetSlaveName(c),
416             psync_len, psync_offset);
417     /* Note that we don't need to set the selected DB at server.slaveseldb
418      * to -1 to force the master to emit SELECT, since the slave already
419      * has this state from the previous connection with the master. */
420 
421     refreshGoodSlavesCount();
422     return REDIS_OK; /* The caller can return, no full resync needed. */
423 
424 need_full_resync:
425     /* We need a full resync for some reason... notify the client. */
426     psync_offset = server.master_repl_offset;
427     /* Add 1 to psync_offset if it the replication backlog does not exists
428      * as when it will be created later we'll increment the offset by one. */
429     if (server.repl_backlog == NULL) psync_offset++;
430     /* Again, we can't use the connection buffers (see above). */
431     buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
432                       server.runid,psync_offset);
433     if (write(c->fd,buf,buflen) != buflen) {
434         freeClientAsync(c);
435         return REDIS_OK;
436     }
437     return REDIS_ERR;
438 }
439 
440 /* Start a BGSAVE for replication goals, which is, selecting the disk or
441  * socket target depending on the configuration, and making sure that
442  * the script cache is flushed before to start.
443  *
444  * Returns REDIS_OK on success or REDIS_ERR otherwise. */
445 int startBgsaveForReplication(void) {
446     int retval;
447 
448     redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
449         server.repl_diskless_sync ? "slaves sockets" : "disk");
450 
451     if (server.repl_diskless_sync)
452         retval = rdbSaveToSlavesSockets();
453     else
454         retval = rdbSaveBackground(server.rdb_filename);
455 
456     /* Flush the script cache, since we need that slave differences are
457      * accumulated without requiring slaves to match our cached scripts. */
458     if (retval == REDIS_OK) replicationScriptCacheFlush();
459     return retval;
460 }
461 
462 /* SYNC and PSYNC command implemenation. */
463 void syncCommand(redisClient *c) {
464     /* ignore SYNC if already slave or in monitor mode */
465     if (c->flags & REDIS_SLAVE) return;
466 
467     /* Refuse SYNC requests if we are a slave but the link with our master
468      * is not ok... */
469     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
470         addReplyError(c,"Can't SYNC while not connected with my master");
471         return;
472     }
473 
474     /* SYNC can't be issued when the server has pending data to send to
475      * the client about already issued commands. We need a fresh reply
476      * buffer registering the differences between the BGSAVE and the current
477      * dataset, so that we can copy to other slaves if needed. */
478     if (listLength(c->reply) != 0 || c->bufpos != 0) {
479         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
480         return;
481     }
482 
483     redisLog(REDIS_NOTICE,"Slave %s asks for synchronization",
484         replicationGetSlaveName(c));
485 
486     /* Try a partial resynchronization if this is a PSYNC command.
487      * If it fails, we continue with usual full resynchronization, however
488      * when this happens masterTryPartialResynchronization() already
489      * replied with:
490      *
491      * +FULLRESYNC <runid> <offset>
492      *
493      * So the slave knows the new runid and offset to try a PSYNC later
494      * if the connection with the master is lost. */
495     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
496         if (masterTryPartialResynchronization(c) == REDIS_OK) {
497             server.stat_sync_partial_ok++;
498             return; /* No full resync needed, return. */
499         } else {
500             char *master_runid = c->argv[1]->ptr;
501 
502             /* Increment stats for failed PSYNCs, but only if the
503              * runid is not "?", as this is used by slaves to force a full
504              * resync on purpose when they are not albe to partially
505              * resync. */
506             if (master_runid[0] != '?') server.stat_sync_partial_err++;
507         }
508     } else {
509         /* If a slave uses SYNC, we are dealing with an old implementation
510          * of the replication protocol (like redis-cli --slave). Flag the client
511          * so that we don't expect to receive REPLCONF ACK feedbacks. */
512         c->flags |= REDIS_PRE_PSYNC;
513     }
514 
515     /* Full resynchronization. */
516     server.stat_sync_full++;
517 
518     /* Here we need to check if there is a background saving operation
519      * in progress, or if it is required to start one */
520     if (server.rdb_child_pid != -1 &&
521         server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK)
522     {
523         /* Ok a background save is in progress. Let's check if it is a good
524          * one for replication, i.e. if there is another slave that is
525          * registering differences since the server forked to save. */
526         redisClient *slave;
527         listNode *ln;
528         listIter li;
529 
530         listRewind(server.slaves,&li);
531         while((ln = listNext(&li))) {
532             slave = ln->value;
533             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
534         }
535         if (ln) {
536             /* Perfect, the server is already registering differences for
537              * another slave. Set the right state, and copy the buffer. */
538             copyClientOutputBuffer(c,slave);
539             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
540             redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
541         } else {
542             /* No way, we need to wait for the next BGSAVE in order to
543              * register differences. */
544             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
545             redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
546         }
547     } else if (server.rdb_child_pid != -1 &&
548                server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET)
549     {
550         /* There is an RDB child process but it is writing directly to
551          * children sockets. We need to wait for the next BGSAVE
552          * in order to synchronize. */
553         c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
554         redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
555     } else {
556         if (server.repl_diskless_sync) {
557             /* Diskless replication RDB child is created inside
558              * replicationCron() since we want to delay its start a
559              * few seconds to wait for more slaves to arrive. */
560             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
561             if (server.repl_diskless_sync_delay)
562                 redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
563         } else {
564             /* Ok we don't have a BGSAVE in progress, let's start one. */
565             if (startBgsaveForReplication() != REDIS_OK) {
566                 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
567                 addReplyError(c,"Unable to perform background save");
568                 return;
569             }
570             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
571         }
572     }
573 
574     if (server.repl_disable_tcp_nodelay)
575         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
576     c->repldbfd = -1;
577     c->flags |= REDIS_SLAVE;
578     server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
579     listAddNodeTail(server.slaves,c);
580     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
581         createReplicationBacklog();
582     return;
583 }
584 
585 /* REPLCONF <option> <value> <option> <value> ...
586  * This command is used by a slave in order to configure the replication
587  * process before starting it with the SYNC command.
588  *
589  * Currently the only use of this command is to communicate to the master
590  * what is the listening port of the Slave redis instance, so that the
591  * master can accurately list slaves and their listening ports in
592  * the INFO output.
593  *
594  * In the future the same command can be used in order to configure
595  * the replication to initiate an incremental replication instead of a
596  * full resync. */
597 void replconfCommand(redisClient *c) {
598     int j;
599 
600     if ((c->argc % 2) == 0) {
601         /* Number of arguments must be odd to make sure that every
602          * option has a corresponding value. */
603         addReply(c,shared.syntaxerr);
604         return;
605     }
606 
607     /* Process every option-value pair. */
608     for (j = 1; j < c->argc; j+=2) {
609         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
610             long port;
611 
612             if ((getLongFromObjectOrReply(c,c->argv[j+1],
613                     &port,NULL) != REDIS_OK))
614                 return;
615             c->slave_listening_port = port;
616         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
617             /* REPLCONF ACK is used by slave to inform the master the amount
618              * of replication stream that it processed so far. It is an
619              * internal only command that normal clients should never use. */
620             long long offset;
621 
622             if (!(c->flags & REDIS_SLAVE)) return;
623             if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
624                 return;
625             if (offset > c->repl_ack_off)
626                 c->repl_ack_off = offset;
627             c->repl_ack_time = server.unixtime;
628             /* If this was a diskless replication, we need to really put
629              * the slave online when the first ACK is received (which
630              * confirms slave is online and ready to get more data). */
631             if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE)
632                 putSlaveOnline(c);
633             /* Note: this command does not reply anything! */
634             return;
635         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
636             /* REPLCONF GETACK is used in order to request an ACK ASAP
637              * to the slave. */
638             if (server.masterhost && server.master) replicationSendAck();
639             /* Note: this command does not reply anything! */
640         } else {
641             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
642                 (char*)c->argv[j]->ptr);
643             return;
644         }
645     }
646     addReply(c,shared.ok);
647 }
648 
649 /* This function puts a slave in the online state, and should be called just
650  * after a slave received the RDB file for the initial synchronization, and
651  * we are finally ready to send the incremental stream of commands.
652  *
653  * It does a few things:
654  *
655  * 1) Put the slave in ONLINE state.
656  * 2) Make sure the writable event is re-installed, since calling the SYNC
657  *    command disables it, so that we can accumulate output buffer without
658  *    sending it to the slave.
659  * 3) Update the count of good slaves. */
660 void putSlaveOnline(redisClient *slave) {
661     slave->replstate = REDIS_REPL_ONLINE;
662     slave->repl_put_online_on_ack = 0;
663     slave->repl_ack_time = server.unixtime;
664     if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
665         sendReplyToClient, slave) == AE_ERR) {
666         redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
667         freeClient(slave);
668         return;
669     }
670     refreshGoodSlavesCount();
671     redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded",
672         replicationGetSlaveName(slave));
673 }
674 
675 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
676     redisClient *slave = privdata;
677     REDIS_NOTUSED(el);
678     REDIS_NOTUSED(mask);
679     char buf[REDIS_IOBUF_LEN];
680     ssize_t nwritten, buflen;
681 
682     /* Before sending the RDB file, we send the preamble as configured by the
683      * replication process. Currently the preamble is just the bulk count of
684      * the file in the form "$<length>\r\n". */
685     if (slave->replpreamble) {
686         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
687         if (nwritten == -1) {
688             redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
689                 strerror(errno));
690             freeClient(slave);
691             return;
692         }
693         server.stat_net_output_bytes += nwritten;
694         sdsrange(slave->replpreamble,nwritten,-1);
695         if (sdslen(slave->replpreamble) == 0) {
696             sdsfree(slave->replpreamble);
697             slave->replpreamble = NULL;
698             /* fall through sending data. */
699         } else {
700             return;
701         }
702     }
703 
704     /* If the preamble was already transfered, send the RDB bulk data. */
705     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
706     buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
707     if (buflen <= 0) {
708         redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
709             (buflen == 0) ? "premature EOF" : strerror(errno));
710         freeClient(slave);
711         return;
712     }
713     if ((nwritten = write(fd,buf,buflen)) == -1) {
714         if (errno != EAGAIN) {
715             redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
716                 strerror(errno));
717             freeClient(slave);
718         }
719         return;
720     }
721     slave->repldboff += nwritten;
722     server.stat_net_output_bytes += nwritten;
723     if (slave->repldboff == slave->repldbsize) {
724         close(slave->repldbfd);
725         slave->repldbfd = -1;
726         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
727         putSlaveOnline(slave);
728     }
729 }
730 
731 /* This function is called at the end of every background saving,
732  * or when the replication RDB transfer strategy is modified from
733  * disk to socket or the other way around.
734  *
735  * The goal of this function is to handle slaves waiting for a successful
736  * background saving in order to perform non-blocking synchronization, and
737  * to schedule a new BGSAVE if there are slaves that attached while a
738  * BGSAVE was in progress, but it was not a good one for replication (no
739  * other slave was accumulating differences).
740  *
741  * The argument bgsaveerr is REDIS_OK if the background saving succeeded
742  * otherwise REDIS_ERR is passed to the function.
743  * The 'type' argument is the type of the child that terminated
744  * (if it had a disk or socket target). */
745 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
746     listNode *ln;
747     int startbgsave = 0;
748     listIter li;
749 
750     listRewind(server.slaves,&li);
751     while((ln = listNext(&li))) {
752         redisClient *slave = ln->value;
753 
754         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
755             startbgsave = 1;
756             slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
757         } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
758             struct redis_stat buf;
759 
760             /* If this was an RDB on disk save, we have to prepare to send
761              * the RDB from disk to the slave socket. Otherwise if this was
762              * already an RDB -> Slaves socket transfer, used in the case of
763              * diskless replication, our work is trivial, we can just put
764              * the slave online. */
765             if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
766                 redisLog(REDIS_NOTICE,
767                     "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
768                         replicationGetSlaveName(slave));
769                 /* Note: we wait for a REPLCONF ACK message from slave in
770                  * order to really put it online (install the write handler
771                  * so that the accumulated data can be transfered). However
772                  * we change the replication state ASAP, since our slave
773                  * is technically online now. */
774                 slave->replstate = REDIS_REPL_ONLINE;
775                 slave->repl_put_online_on_ack = 1;
776             } else {
777                 if (bgsaveerr != REDIS_OK) {
778                     freeClient(slave);
779                     redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
780                     continue;
781                 }
782                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
783                     redis_fstat(slave->repldbfd,&buf) == -1) {
784                     freeClient(slave);
785                     redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
786                     continue;
787                 }
788                 slave->repldboff = 0;
789                 slave->repldbsize = buf.st_size;
790                 slave->replstate = REDIS_REPL_SEND_BULK;
791                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
792                     (unsigned long long) slave->repldbsize);
793 
794                 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
795                 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
796                     freeClient(slave);
797                     continue;
798                 }
799             }
800         }
801     }
802     if (startbgsave) {
803         if (startBgsaveForReplication() != REDIS_OK) {
804             listIter li;
805 
806             listRewind(server.slaves,&li);
807             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
808             while((ln = listNext(&li))) {
809                 redisClient *slave = ln->value;
810 
811                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
812                     freeClient(slave);
813             }
814         }
815     }
816 }
817 
818 /* ----------------------------------- SLAVE -------------------------------- */
819 
820 /* Abort the async download of the bulk dataset while SYNC-ing with master */
821 void replicationAbortSyncTransfer(void) {
822     redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
823 
824     aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
825     close(server.repl_transfer_s);
826     close(server.repl_transfer_fd);
827     unlink(server.repl_transfer_tmpfile);
828     zfree(server.repl_transfer_tmpfile);
829     server.repl_state = REDIS_REPL_CONNECT;
830 }
831 
832 /* Avoid the master to detect the slave is timing out while loading the
833  * RDB file in initial synchronization. We send a single newline character
834  * that is valid protocol but is guaranteed to either be sent entierly or
835  * not, since the byte is indivisible.
836  *
837  * The function is called in two contexts: while we flush the current
838  * data with emptyDb(), and while we load the new data received as an
839  * RDB file from the master. */
840 void replicationSendNewlineToMaster(void) {
841     static time_t newline_sent;
842     if (time(NULL) != newline_sent) {
843         newline_sent = time(NULL);
844         if (write(server.repl_transfer_s,"\n",1) == -1) {
845             /* Pinging back in this stage is best-effort. */
846         }
847     }
848 }
849 
850 /* Callback used by emptyDb() while flushing away old data to load
851  * the new dataset received by the master. */
852 void replicationEmptyDbCallback(void *privdata) {
853     REDIS_NOTUSED(privdata);
854     replicationSendNewlineToMaster();
855 }
856 
857 /* Once we have a link with the master and the synchroniziation was
858  * performed, this function materializes the master client we store
859  * at server.master, starting from the specified file descriptor. */
860 void replicationCreateMasterClient(int fd) {
861     server.master = createClient(fd);
862     server.master->flags |= REDIS_MASTER;
863     server.master->authenticated = 1;
864     server.repl_state = REDIS_REPL_CONNECTED;
865     server.master->reploff = server.repl_master_initial_offset;
866     memcpy(server.master->replrunid, server.repl_master_runid,
867         sizeof(server.repl_master_runid));
868     /* If master offset is set to -1, this master is old and is not
869      * PSYNC capable, so we flag it accordingly. */
870     if (server.master->reploff == -1)
871         server.master->flags |= REDIS_PRE_PSYNC;
872 }
873 
874 /* Asynchronously read the SYNC payload we receive from a master */
875 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
876 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
877     char buf[4096];
878     ssize_t nread, readlen;
879     off_t left;
880     REDIS_NOTUSED(el);
881     REDIS_NOTUSED(privdata);
882     REDIS_NOTUSED(mask);
883 
884     /* Static vars used to hold the EOF mark, and the last bytes received
885      * form the server: when they match, we reached the end of the transfer. */
886     static char eofmark[REDIS_RUN_ID_SIZE];
887     static char lastbytes[REDIS_RUN_ID_SIZE];
888     static int usemark = 0;
889 
890     /* If repl_transfer_size == -1 we still have to read the bulk length
891      * from the master reply. */
892     if (server.repl_transfer_size == -1) {
893         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
894             redisLog(REDIS_WARNING,
895                 "I/O error reading bulk count from MASTER: %s",
896                 strerror(errno));
897             goto error;
898         }
899 
900         if (buf[0] == '-') {
901             redisLog(REDIS_WARNING,
902                 "MASTER aborted replication with an error: %s",
903                 buf+1);
904             goto error;
905         } else if (buf[0] == '\0') {
906             /* At this stage just a newline works as a PING in order to take
907              * the connection live. So we refresh our last interaction
908              * timestamp. */
909             server.repl_transfer_lastio = server.unixtime;
910             return;
911         } else if (buf[0] != '$') {
912             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
913             goto error;
914         }
915 
916         /* There are two possible forms for the bulk payload. One is the
917          * usual $<count> bulk format. The other is used for diskless transfers
918          * when the master does not know beforehand the size of the file to
919          * transfer. In the latter case, the following format is used:
920          *
921          * $EOF:<40 bytes delimiter>
922          *
923          * At the end of the file the announced delimiter is transmitted. The
924          * delimiter is long and random enough that the probability of a
925          * collision with the actual file content can be ignored. */
926         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
927             usemark = 1;
928             memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
929             memset(lastbytes,0,REDIS_RUN_ID_SIZE);
930             /* Set any repl_transfer_size to avoid entering this code path
931              * at the next call. */
932             server.repl_transfer_size = 0;
933             redisLog(REDIS_NOTICE,
934                 "MASTER <-> SLAVE sync: receiving streamed RDB from master");
935         } else {
936             usemark = 0;
937             server.repl_transfer_size = strtol(buf+1,NULL,10);
938             redisLog(REDIS_NOTICE,
939                 "MASTER <-> SLAVE sync: receiving %lld bytes from master",
940                 (long long) server.repl_transfer_size);
941         }
942         return;
943     }
944 
945     /* Read bulk data */
946     if (usemark) {
947         readlen = sizeof(buf);
948     } else {
949         left = server.repl_transfer_size - server.repl_transfer_read;
950         readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
951     }
952 
953     nread = read(fd,buf,readlen);
954     if (nread <= 0) {
955         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
956             (nread == -1) ? strerror(errno) : "connection lost");
957         replicationAbortSyncTransfer();
958         return;
959     }
960     server.stat_net_input_bytes += nread;
961 
962     /* When a mark is used, we want to detect EOF asap in order to avoid
963      * writing the EOF mark into the file... */
964     int eof_reached = 0;
965 
966     if (usemark) {
967         /* Update the last bytes array, and check if it matches our delimiter.*/
968         if (nread >= REDIS_RUN_ID_SIZE) {
969             memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
970         } else {
971             int rem = REDIS_RUN_ID_SIZE-nread;
972             memmove(lastbytes,lastbytes+nread,rem);
973             memcpy(lastbytes+rem,buf,nread);
974         }
975         if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
976     }
977 
978     server.repl_transfer_lastio = server.unixtime;
979     if (write(server.repl_transfer_fd,buf,nread) != nread) {
980         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
981         goto error;
982     }
983     server.repl_transfer_read += nread;
984 
985     /* Delete the last 40 bytes from the file if we reached EOF. */
986     if (usemark && eof_reached) {
987         if (ftruncate(server.repl_transfer_fd,
988             server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)
989         {
990             redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
991             goto error;
992         }
993     }
994 
995     /* Sync data on disk from time to time, otherwise at the end of the transfer
996      * we may suffer a big delay as the memory buffers are copied into the
997      * actual disk. */
998     if (server.repl_transfer_read >=
999         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1000     {
1001         off_t sync_size = server.repl_transfer_read -
1002                           server.repl_transfer_last_fsync_off;
1003         rdb_fsync_range(server.repl_transfer_fd,
1004             server.repl_transfer_last_fsync_off, sync_size);
1005         server.repl_transfer_last_fsync_off += sync_size;
1006     }
1007 
1008     /* Check if the transfer is now complete */
1009     if (!usemark) {
1010         if (server.repl_transfer_read == server.repl_transfer_size)
1011             eof_reached = 1;
1012     }
1013 
1014     if (eof_reached) {
1015         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
1016             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
1017             replicationAbortSyncTransfer();
1018             return;
1019         }
1020         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1021         signalFlushedDb(-1);
1022         emptyDb(replicationEmptyDbCallback);
1023         /* Before loading the DB into memory we need to delete the readable
1024          * handler, otherwise it will get called recursively since
1025          * rdbLoad() will call the event loop to process events from time to
1026          * time for non blocking loading. */
1027         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
1028         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
1029         if (rdbLoad(server.rdb_filename) != REDIS_OK) {
1030             redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1031             replicationAbortSyncTransfer();
1032             return;
1033         }
1034         /* Final setup of the connected slave <- master link */
1035         zfree(server.repl_transfer_tmpfile);
1036         close(server.repl_transfer_fd);
1037         replicationCreateMasterClient(server.repl_transfer_s);
1038         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1039         /* Restart the AOF subsystem now that we finished the sync. This
1040          * will trigger an AOF rewrite, and when done will start appending
1041          * to the new file. */
1042         if (server.aof_state != REDIS_AOF_OFF) {
1043             int retry = 10;
1044 
1045             stopAppendOnly();
1046             while (retry-- && startAppendOnly() == REDIS_ERR) {
1047                 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1048                 sleep(1);
1049             }
1050             if (!retry) {
1051                 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1052                 exit(1);
1053             }
1054         }
1055     }
1056 
1057     return;
1058 
1059 error:
1060     replicationAbortSyncTransfer();
1061     return;
1062 }
1063 
1064 /* Send a synchronous command to the master. Used to send AUTH and
1065  * REPLCONF commands before starting the replication with SYNC.
1066  *
1067  * The command returns an sds string representing the result of the
1068  * operation. On error the first byte is a "-".
1069  */
1070 char *sendSynchronousCommand(int fd, ...) {
1071     va_list ap;
1072     sds cmd = sdsempty();
1073     char *arg, buf[256];
1074 
1075     /* Create the command to send to the master, we use simple inline
1076      * protocol for simplicity as currently we only send simple strings. */
1077     va_start(ap,fd);
1078     while(1) {
1079         arg = va_arg(ap, char*);
1080         if (arg == NULL) break;
1081 
1082         if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
1083         cmd = sdscat(cmd,arg);
1084     }
1085     cmd = sdscatlen(cmd,"\r\n",2);
1086 
1087     /* Transfer command to the server. */
1088     if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
1089         sdsfree(cmd);
1090         return sdscatprintf(sdsempty(),"-Writing to master: %s",
1091                 strerror(errno));
1092     }
1093     sdsfree(cmd);
1094 
1095     /* Read the reply from the server. */
1096     if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
1097     {
1098         return sdscatprintf(sdsempty(),"-Reading from master: %s",
1099                 strerror(errno));
1100     }
1101     return sdsnew(buf);
1102 }
1103 
1104 /* Try a partial resynchronization with the master if we are about to reconnect.
1105  * If there is no cached master structure, at least try to issue a
1106  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1107  * command in order to obtain the master run id and the master replication
1108  * global offset.
1109  *
1110  * This function is designed to be called from syncWithMaster(), so the
1111  * following assumptions are made:
1112  *
1113  * 1) We pass the function an already connected socket "fd".
1114  * 2) This function does not close the file descriptor "fd". However in case
1115  *    of successful partial resynchronization, the function will reuse
1116  *    'fd' as file descriptor of the server.master client structure.
1117  *
1118  * The function returns:
1119  *
1120  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
1121  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1122  *                   In this case the master run_id and global replication
1123  *                   offset is saved.
1124  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1125  *                      the caller should fall back to SYNC.
1126  */
1127 
1128 #define PSYNC_CONTINUE 0
1129 #define PSYNC_FULLRESYNC 1
1130 #define PSYNC_NOT_SUPPORTED 2
1131 int slaveTryPartialResynchronization(int fd) {
1132     char *psync_runid;
1133     char psync_offset[32];
1134     sds reply;
1135 
1136     /* Initially set repl_master_initial_offset to -1 to mark the current
1137      * master run_id and offset as not valid. Later if we'll be able to do
1138      * a FULL resync using the PSYNC command we'll set the offset at the
1139      * right value, so that this information will be propagated to the
1140      * client structure representing the master into server.master. */
1141     server.repl_master_initial_offset = -1;
1142 
1143     if (server.cached_master) {
1144         psync_runid = server.cached_master->replrunid;
1145         snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1146         redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
1147     } else {
1148         redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
1149         psync_runid = "?";
1150         memcpy(psync_offset,"-1",3);
1151     }
1152 
1153     /* Issue the PSYNC command */
1154     reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
1155 
1156     if (!strncmp(reply,"+FULLRESYNC",11)) {
1157         char *runid = NULL, *offset = NULL;
1158 
1159         /* FULL RESYNC, parse the reply in order to extract the run id
1160          * and the replication offset. */
1161         runid = strchr(reply,' ');
1162         if (runid) {
1163             runid++;
1164             offset = strchr(runid,' ');
1165             if (offset) offset++;
1166         }
1167         if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
1168             redisLog(REDIS_WARNING,
1169                 "Master replied with wrong +FULLRESYNC syntax.");
1170             /* This is an unexpected condition, actually the +FULLRESYNC
1171              * reply means that the master supports PSYNC, but the reply
1172              * format seems wrong. To stay safe we blank the master
1173              * runid to make sure next PSYNCs will fail. */
1174             memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
1175         } else {
1176             memcpy(server.repl_master_runid, runid, offset-runid-1);
1177             server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
1178             server.repl_master_initial_offset = strtoll(offset,NULL,10);
1179             redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
1180                 server.repl_master_runid,
1181                 server.repl_master_initial_offset);
1182         }
1183         /* We are going to full resync, discard the cached master structure. */
1184         replicationDiscardCachedMaster();
1185         sdsfree(reply);
1186         return PSYNC_FULLRESYNC;
1187     }
1188 
1189     if (!strncmp(reply,"+CONTINUE",9)) {
1190         /* Partial resync was accepted, set the replication state accordingly */
1191         redisLog(REDIS_NOTICE,
1192             "Successful partial resynchronization with master.");
1193         sdsfree(reply);
1194         replicationResurrectCachedMaster(fd);
1195         return PSYNC_CONTINUE;
1196     }
1197 
1198     /* If we reach this point we receied either an error since the master does
1199      * not understand PSYNC, or an unexpected reply from the master.
1200      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
1201 
1202     if (strncmp(reply,"-ERR",4)) {
1203         /* If it's not an error, log the unexpected event. */
1204         redisLog(REDIS_WARNING,
1205             "Unexpected reply to PSYNC from master: %s", reply);
1206     } else {
1207         redisLog(REDIS_NOTICE,
1208             "Master does not support PSYNC or is in "
1209             "error state (reply: %s)", reply);
1210     }
1211     sdsfree(reply);
1212     replicationDiscardCachedMaster();
1213     return PSYNC_NOT_SUPPORTED;
1214 }
1215 
1216 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
1217     char tmpfile[256], *err;
1218     int dfd, maxtries = 5;
1219     int sockerr = 0, psync_result;
1220     socklen_t errlen = sizeof(sockerr);
1221     REDIS_NOTUSED(el);
1222     REDIS_NOTUSED(privdata);
1223     REDIS_NOTUSED(mask);
1224 
1225     /* If this event fired after the user turned the instance into a master
1226      * with SLAVEOF NO ONE we must just return ASAP. */
1227     if (server.repl_state == REDIS_REPL_NONE) {
1228         close(fd);
1229         return;
1230     }
1231 
1232     /* Check for errors in the socket. */
1233     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1234         sockerr = errno;
1235     if (sockerr) {
1236         aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1237         redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
1238             strerror(sockerr));
1239         goto error;
1240     }
1241 
1242     /* If we were connecting, it's time to send a non blocking PING, we want to
1243      * make sure the master is able to reply before going into the actual
1244      * replication process where we have long timeouts in the order of
1245      * seconds (in the meantime the slave would block). */
1246     if (server.repl_state == REDIS_REPL_CONNECTING) {
1247         redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
1248         /* Delete the writable event so that the readable event remains
1249          * registered and we can wait for the PONG reply. */
1250         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
1251         server.repl_state = REDIS_REPL_RECEIVE_PONG;
1252         /* Send the PING, don't check for errors at all, we have the timeout
1253          * that will take care about this. */
1254         syncWrite(fd,"PING\r\n",6,100);
1255         return;
1256     }
1257 
1258     /* Receive the PONG command. */
1259     if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
1260         char buf[1024];
1261 
1262         /* Delete the readable event, we no longer need it now that there is
1263          * the PING reply to read. */
1264         aeDeleteFileEvent(server.el,fd,AE_READABLE);
1265 
1266         /* Read the reply with explicit timeout. */
1267         buf[0] = '\0';
1268         if (syncReadLine(fd,buf,sizeof(buf),
1269             server.repl_syncio_timeout*1000) == -1)
1270         {
1271             redisLog(REDIS_WARNING,
1272                 "I/O error reading PING reply from master: %s",
1273                 strerror(errno));
1274             goto error;
1275         }
1276 
1277         /* We accept only two replies as valid, a positive +PONG reply
1278          * (we just check for "+") or an authentication error.
1279          * Note that older versions of Redis replied with "operation not
1280          * permitted" instead of using a proper error code, so we test
1281          * both. */
1282         if (buf[0] != '+' &&
1283             strncmp(buf,"-NOAUTH",7) != 0 &&
1284             strncmp(buf,"-ERR operation not permitted",28) != 0)
1285         {
1286             redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
1287             goto error;
1288         } else {
1289             redisLog(REDIS_NOTICE,
1290                 "Master replied to PING, replication can continue...");
1291         }
1292     }
1293 
1294     /* AUTH with the master if required. */
1295     if(server.masterauth) {
1296         err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
1297         if (err[0] == '-') {
1298             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
1299             sdsfree(err);
1300             goto error;
1301         }
1302         sdsfree(err);
1303     }
1304 
1305     /* Set the slave port, so that Master's INFO command can list the
1306      * slave listening port correctly. */
1307     {
1308         sds port = sdsfromlonglong(server.port);
1309         err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
1310                                          NULL);
1311         sdsfree(port);
1312         /* Ignore the error if any, not all the Redis versions support
1313          * REPLCONF listening-port. */
1314         if (err[0] == '-') {
1315             redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
1316         }
1317         sdsfree(err);
1318     }
1319 
1320     /* Try a partial resynchonization. If we don't have a cached master
1321      * slaveTryPartialResynchronization() will at least try to use PSYNC
1322      * to start a full resynchronization so that we get the master run id
1323      * and the global offset, to try a partial resync at the next
1324      * reconnection attempt. */
1325     psync_result = slaveTryPartialResynchronization(fd);
1326     if (psync_result == PSYNC_CONTINUE) {
1327         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
1328         return;
1329     }
1330 
1331     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
1332      * and the server.repl_master_runid and repl_master_initial_offset are
1333      * already populated. */
1334     if (psync_result == PSYNC_NOT_SUPPORTED) {
1335         redisLog(REDIS_NOTICE,"Retrying with SYNC...");
1336         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
1337             redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
1338                 strerror(errno));
1339             goto error;
1340         }
1341     }
1342 
1343     /* Prepare a suitable temp file for bulk transfer */
1344     while(maxtries--) {
1345         snprintf(tmpfile,256,
1346             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1347         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1348         if (dfd != -1) break;
1349         sleep(1);
1350     }
1351     if (dfd == -1) {
1352         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1353         goto error;
1354     }
1355 
1356     /* Setup the non blocking download of the bulk file. */
1357     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
1358             == AE_ERR)
1359     {
1360         redisLog(REDIS_WARNING,
1361             "Can't create readable event for SYNC: %s (fd=%d)",
1362             strerror(errno),fd);
1363         goto error;
1364     }
1365 
1366     server.repl_state = REDIS_REPL_TRANSFER;
1367     server.repl_transfer_size = -1;
1368     server.repl_transfer_read = 0;
1369     server.repl_transfer_last_fsync_off = 0;
1370     server.repl_transfer_fd = dfd;
1371     server.repl_transfer_lastio = server.unixtime;
1372     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1373     return;
1374 
1375 error:
1376     close(fd);
1377     server.repl_transfer_s = -1;
1378     server.repl_state = REDIS_REPL_CONNECT;
1379     return;
1380 }
1381 
1382 int connectWithMaster(void) {
1383     int fd;
1384 
1385     fd = anetTcpNonBlockBindConnect(NULL,
1386         server.masterhost,server.masterport,REDIS_BIND_ADDR);
1387     if (fd == -1) {
1388         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
1389             strerror(errno));
1390         return REDIS_ERR;
1391     }
1392 
1393     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1394             AE_ERR)
1395     {
1396         close(fd);
1397         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
1398         return REDIS_ERR;
1399     }
1400 
1401     server.repl_transfer_lastio = server.unixtime;
1402     server.repl_transfer_s = fd;
1403     server.repl_state = REDIS_REPL_CONNECTING;
1404     return REDIS_OK;
1405 }
1406 
1407 /* This function can be called when a non blocking connection is currently
1408  * in progress to undo it. */
1409 void undoConnectWithMaster(void) {
1410     int fd = server.repl_transfer_s;
1411 
1412     redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
1413                 server.repl_state == REDIS_REPL_RECEIVE_PONG);
1414     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1415     close(fd);
1416     server.repl_transfer_s = -1;
1417     server.repl_state = REDIS_REPL_CONNECT;
1418 }
1419 
1420 /* This function aborts a non blocking replication attempt if there is one
1421  * in progress, by canceling the non-blocking connect attempt or
1422  * the initial bulk transfer.
1423  *
1424  * If there was a replication handshake in progress 1 is returned and
1425  * the replication state (server.repl_state) set to REDIS_REPL_CONNECT.
1426  *
1427  * Otherwise zero is returned and no operation is perforemd at all. */
1428 int cancelReplicationHandshake(void) {
1429     if (server.repl_state == REDIS_REPL_TRANSFER) {
1430         replicationAbortSyncTransfer();
1431     } else if (server.repl_state == REDIS_REPL_CONNECTING ||
1432              server.repl_state == REDIS_REPL_RECEIVE_PONG)
1433     {
1434         undoConnectWithMaster();
1435     } else {
1436         return 0;
1437     }
1438     return 1;
1439 }
1440 
1441 /* Set replication to the specified master address and port. */
1442 void replicationSetMaster(char *ip, int port) {
1443     sdsfree(server.masterhost);
1444     server.masterhost = sdsnew(ip);
1445     server.masterport = port;
1446     if (server.master) freeClient(server.master);
1447     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1448     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
1449     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1450     cancelReplicationHandshake();
1451     server.repl_state = REDIS_REPL_CONNECT;
1452     server.master_repl_offset = 0;
1453     server.repl_down_since = 0;
1454 }
1455 
1456 /* Cancel replication, setting the instance as a master itself. */
1457 void replicationUnsetMaster(void) {
1458     if (server.masterhost == NULL) return; /* Nothing to do. */
1459     sdsfree(server.masterhost);
1460     server.masterhost = NULL;
1461     if (server.master) {
1462         if (listLength(server.slaves) == 0) {
1463             /* If this instance is turned into a master and there are no
1464              * slaves, it inherits the replication offset from the master.
1465              * Under certain conditions this makes replicas comparable by
1466              * replication offset to understand what is the most updated. */
1467             server.master_repl_offset = server.master->reploff;
1468             freeReplicationBacklog();
1469         }
1470         freeClient(server.master);
1471     }
1472     replicationDiscardCachedMaster();
1473     cancelReplicationHandshake();
1474     server.repl_state = REDIS_REPL_NONE;
1475 }
1476 
1477 void slaveofCommand(redisClient *c) {
1478     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1479      * configured using the current address of the master node. */
1480     if (server.cluster_enabled) {
1481         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1482         return;
1483     }
1484 
1485     /* The special host/port combination "NO" "ONE" turns the instance
1486      * into a master. Otherwise the new master address is set. */
1487     if (!strcasecmp(c->argv[1]->ptr,"no") &&
1488         !strcasecmp(c->argv[2]->ptr,"one")) {
1489         if (server.masterhost) {
1490             replicationUnsetMaster();
1491             redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
1492         }
1493     } else {
1494         long port;
1495 
1496         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
1497             return;
1498 
1499         /* Check if we are already attached to the specified slave */
1500         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1501             && server.masterport == port) {
1502             redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1503             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1504             return;
1505         }
1506         /* There was no previous master or the user specified a different one,
1507          * we can continue. */
1508         replicationSetMaster(c->argv[1]->ptr, port);
1509         redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
1510             server.masterhost, server.masterport);
1511     }
1512     addReply(c,shared.ok);
1513 }
1514 
1515 /* ROLE command: provide information about the role of the instance
1516  * (master or slave) and additional information related to replication
1517  * in an easy to process format. */
1518 void roleCommand(redisClient *c) {
1519     if (server.masterhost == NULL) {
1520         listIter li;
1521         listNode *ln;
1522         void *mbcount;
1523         int slaves = 0;
1524 
1525         addReplyMultiBulkLen(c,3);
1526         addReplyBulkCBuffer(c,"master",6);
1527         addReplyLongLong(c,server.master_repl_offset);
1528         mbcount = addDeferredMultiBulkLength(c);
1529         listRewind(server.slaves,&li);
1530         while((ln = listNext(&li))) {
1531             redisClient *slave = ln->value;
1532             char ip[REDIS_IP_STR_LEN];
1533 
1534             if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue;
1535             if (slave->replstate != REDIS_REPL_ONLINE) continue;
1536             addReplyMultiBulkLen(c,3);
1537             addReplyBulkCString(c,ip);
1538             addReplyBulkLongLong(c,slave->slave_listening_port);
1539             addReplyBulkLongLong(c,slave->repl_ack_off);
1540             slaves++;
1541         }
1542         setDeferredMultiBulkLength(c,mbcount,slaves);
1543     } else {
1544         char *slavestate = NULL;
1545 
1546         addReplyMultiBulkLen(c,5);
1547         addReplyBulkCBuffer(c,"slave",5);
1548         addReplyBulkCString(c,server.masterhost);
1549         addReplyLongLong(c,server.masterport);
1550         switch(server.repl_state) {
1551         case REDIS_REPL_NONE: slavestate = "none"; break;
1552         case REDIS_REPL_CONNECT: slavestate = "connect"; break;
1553         case REDIS_REPL_CONNECTING: slavestate = "connecting"; break;
1554         case REDIS_REPL_RECEIVE_PONG: /* see next */
1555         case REDIS_REPL_TRANSFER: slavestate = "sync"; break;
1556         case REDIS_REPL_CONNECTED: slavestate = "connected"; break;
1557         default: slavestate = "unknown"; break;
1558         }
1559         addReplyBulkCString(c,slavestate);
1560         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1561     }
1562 }
1563 
1564 /* Send a REPLCONF ACK command to the master to inform it about the current
1565  * processed offset. If we are not connected with a master, the command has
1566  * no effects. */
1567 void replicationSendAck(void) {
1568     redisClient *c = server.master;
1569 
1570     if (c != NULL) {
1571         c->flags |= REDIS_MASTER_FORCE_REPLY;
1572         addReplyMultiBulkLen(c,3);
1573         addReplyBulkCString(c,"REPLCONF");
1574         addReplyBulkCString(c,"ACK");
1575         addReplyBulkLongLong(c,c->reploff);
1576         c->flags &= ~REDIS_MASTER_FORCE_REPLY;
1577     }
1578 }
1579 
1580 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
1581 
1582 /* In order to implement partial synchronization we need to be able to cache
1583  * our master's client structure after a transient disconnection.
1584  * It is cached into server.cached_master and flushed away using the following
1585  * functions. */
1586 
1587 /* This function is called by freeClient() in order to cache the master
1588  * client structure instead of destryoing it. freeClient() will return
1589  * ASAP after this function returns, so every action needed to avoid problems
1590  * with a client that is really "suspended" has to be done by this function.
1591  *
1592  * The other functions that will deal with the cached master are:
1593  *
1594  * replicationDiscardCachedMaster() that will make sure to kill the client
1595  * as for some reason we don't want to use it in the future.
1596  *
1597  * replicationResurrectCachedMaster() that is used after a successful PSYNC
1598  * handshake in order to reactivate the cached master.
1599  */
1600 void replicationCacheMaster(redisClient *c) {
1601     listNode *ln;
1602 
1603     redisAssert(server.master != NULL && server.cached_master == NULL);
1604     redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
1605 
1606     /* Remove from the list of clients, we don't want this client to be
1607      * listed by CLIENT LIST or processed in any way by batch operations. */
1608     ln = listSearchKey(server.clients,c);
1609     redisAssert(ln != NULL);
1610     listDelNode(server.clients,ln);
1611 
1612     /* Save the master. Server.master will be set to null later by
1613      * replicationHandleMasterDisconnection(). */
1614     server.cached_master = server.master;
1615 
1616     /* Remove the event handlers and close the socket. We'll later reuse
1617      * the socket of the new connection with the master during PSYNC. */
1618     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1619     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1620     close(c->fd);
1621 
1622     /* Set fd to -1 so that we can safely call freeClient(c) later. */
1623     c->fd = -1;
1624 
1625     /* Invalidate the Peer ID cache. */
1626     if (c->peerid) {
1627         sdsfree(c->peerid);
1628         c->peerid = NULL;
1629     }
1630 
1631     /* Caching the master happens instead of the actual freeClient() call,
1632      * so make sure to adjust the replication state. This function will
1633      * also set server.master to NULL. */
1634     replicationHandleMasterDisconnection();
1635 }
1636 
1637 /* Free a cached master, called when there are no longer the conditions for
1638  * a partial resync on reconnection. */
1639 void replicationDiscardCachedMaster(void) {
1640     if (server.cached_master == NULL) return;
1641 
1642     redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
1643     server.cached_master->flags &= ~REDIS_MASTER;
1644     freeClient(server.cached_master);
1645     server.cached_master = NULL;
1646 }
1647 
1648 /* Turn the cached master into the current master, using the file descriptor
1649  * passed as argument as the socket for the new master.
1650  *
1651  * This function is called when successfully setup a partial resynchronization
1652  * so the stream of data that we'll receive will start from were this
1653  * master left. */
1654 void replicationResurrectCachedMaster(int newfd) {
1655     server.master = server.cached_master;
1656     server.cached_master = NULL;
1657     server.master->fd = newfd;
1658     server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
1659     server.master->authenticated = 1;
1660     server.master->lastinteraction = server.unixtime;
1661     server.repl_state = REDIS_REPL_CONNECTED;
1662 
1663     /* Re-add to the list of clients. */
1664     listAddNodeTail(server.clients,server.master);
1665     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
1666                           readQueryFromClient, server.master)) {
1667         redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
1668         freeClientAsync(server.master); /* Close ASAP. */
1669     }
1670 
1671     /* We may also need to install the write handler as well if there is
1672      * pending data in the write buffers. */
1673     if (server.master->bufpos || listLength(server.master->reply)) {
1674         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
1675                           sendReplyToClient, server.master)) {
1676             redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
1677             freeClientAsync(server.master); /* Close ASAP. */
1678         }
1679     }
1680 }
1681 
1682 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1683 
1684 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1685  * If the option is active, the server will prevent writes if there are not
1686  * enough connected slaves with the specified lag (or less). */
1687 void refreshGoodSlavesCount(void) {
1688     listIter li;
1689     listNode *ln;
1690     int good = 0;
1691 
1692     if (!server.repl_min_slaves_to_write ||
1693         !server.repl_min_slaves_max_lag) return;
1694 
1695     listRewind(server.slaves,&li);
1696     while((ln = listNext(&li))) {
1697         redisClient *slave = ln->value;
1698         time_t lag = server.unixtime - slave->repl_ack_time;
1699 
1700         if (slave->replstate == REDIS_REPL_ONLINE &&
1701             lag <= server.repl_min_slaves_max_lag) good++;
1702     }
1703     server.repl_good_slaves_count = good;
1704 }
1705 
1706 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
1707  * The goal of this code is to keep track of scripts already sent to every
1708  * connected slave, in order to be able to replicate EVALSHA as it is without
1709  * translating it to EVAL every time it is possible.
1710  *
1711  * We use a capped collection implemented by a hash table for fast lookup
1712  * of scripts we can send as EVALSHA, plus a linked list that is used for
1713  * eviction of the oldest entry when the max number of items is reached.
1714  *
1715  * We don't care about taking a different cache for every different slave
1716  * since to fill the cache again is not very costly, the goal of this code
1717  * is to avoid that the same big script is trasmitted a big number of times
1718  * per second wasting bandwidth and processor speed, but it is not a problem
1719  * if we need to rebuild the cache from scratch from time to time, every used
1720  * script will need to be transmitted a single time to reappear in the cache.
1721  *
1722  * This is how the system works:
1723  *
1724  * 1) Every time a new slave connects, we flush the whole script cache.
1725  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
1726  *    trying to convert EVAL into EVALSHA specifically for slaves.
1727  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
1728  *    corresponding SHA1 of the script into the cache as we are sure every
1729  *    slave knows about the script starting from now.
1730  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
1731  *    and at the same time flush the script cache.
1732  * 5) When the last slave disconnects, flush the cache.
1733  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
1734  *    in the master sometimes.
1735  */
1736 
1737 /* Initialize the script cache, only called at startup. */
1738 void replicationScriptCacheInit(void) {
1739     server.repl_scriptcache_size = 10000;
1740     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
1741     server.repl_scriptcache_fifo = listCreate();
1742 }
1743 
1744 /* Empty the script cache. Should be called every time we are no longer sure
1745  * that every slave knows about all the scripts in our set, or when the
1746  * current AOF "context" is no longer aware of the script. In general we
1747  * should flush the cache:
1748  *
1749  * 1) Every time a new slave reconnects to this master and performs a
1750  *    full SYNC (PSYNC does not require flushing).
1751  * 2) Every time an AOF rewrite is performed.
1752  * 3) Every time we are left without slaves at all, and AOF is off, in order
1753  *    to reclaim otherwise unused memory.
1754  */
1755 void replicationScriptCacheFlush(void) {
1756     dictEmpty(server.repl_scriptcache_dict,NULL);
1757     listRelease(server.repl_scriptcache_fifo);
1758     server.repl_scriptcache_fifo = listCreate();
1759 }
1760 
1761 /* Add an entry into the script cache, if we reach max number of entries the
1762  * oldest is removed from the list. */
1763 void replicationScriptCacheAdd(sds sha1) {
1764     int retval;
1765     sds key = sdsdup(sha1);
1766 
1767     /* Evict oldest. */
1768     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
1769     {
1770         listNode *ln = listLast(server.repl_scriptcache_fifo);
1771         sds oldest = listNodeValue(ln);
1772 
1773         retval = dictDelete(server.repl_scriptcache_dict,oldest);
1774         redisAssert(retval == DICT_OK);
1775         listDelNode(server.repl_scriptcache_fifo,ln);
1776     }
1777 
1778     /* Add current. */
1779     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
1780     listAddNodeHead(server.repl_scriptcache_fifo,key);
1781     redisAssert(retval == DICT_OK);
1782 }
1783 
1784 /* Returns non-zero if the specified entry exists inside the cache, that is,
1785  * if all the slaves are aware of this script SHA1. */
1786 int replicationScriptCacheExists(sds sha1) {
1787     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
1788 }
1789 
1790 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
1791  * Redis synchronous replication design can be summarized in points:
1792  *
1793  * - Redis masters have a global replication offset, used by PSYNC.
1794  * - Master increment the offset every time new commands are sent to slaves.
1795  * - Slaves ping back masters with the offset processed so far.
1796  *
1797  * So synchronous replication adds a new WAIT command in the form:
1798  *
1799  *   WAIT <num_replicas> <milliseconds_timeout>
1800  *
1801  * That returns the number of replicas that processed the query when
1802  * we finally have at least num_replicas, or when the timeout was
1803  * reached.
1804  *
1805  * The command is implemented in this way:
1806  *
1807  * - Every time a client processes a command, we remember the replication
1808  *   offset after sending that command to the slaves.
1809  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
1810  *   The client is blocked at the same time (see blocked.c).
1811  * - Once we receive enough ACKs for a given offset or when the timeout
1812  *   is reached, the WAIT command is unblocked and the reply sent to the
1813  *   client.
1814  */
1815 
1816 /* This just set a flag so that we broadcast a REPLCONF GETACK command
1817  * to all the slaves in the beforeSleep() function. Note that this way
1818  * we "group" all the clients that want to wait for synchronouns replication
1819  * in a given event loop iteration, and send a single GETACK for them all. */
1820 void replicationRequestAckFromSlaves(void) {
1821     server.get_ack_from_slaves = 1;
1822 }
1823 
1824 /* Return the number of slaves that already acknowledged the specified
1825  * replication offset. */
1826 int replicationCountAcksByOffset(long long offset) {
1827     listIter li;
1828     listNode *ln;
1829     int count = 0;
1830 
1831     listRewind(server.slaves,&li);
1832     while((ln = listNext(&li))) {
1833         redisClient *slave = ln->value;
1834 
1835         if (slave->replstate != REDIS_REPL_ONLINE) continue;
1836         if (slave->repl_ack_off >= offset) count++;
1837     }
1838     return count;
1839 }
1840 
1841 /* WAIT for N replicas to acknowledge the processing of our latest
1842  * write command (and all the previous commands). */
1843 void waitCommand(redisClient *c) {
1844     mstime_t timeout;
1845     long numreplicas, ackreplicas;
1846     long long offset = c->woff;
1847 
1848     /* Argument parsing. */
1849     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK)
1850         return;
1851     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
1852         != REDIS_OK) return;
1853 
1854     /* First try without blocking at all. */
1855     ackreplicas = replicationCountAcksByOffset(c->woff);
1856     if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) {
1857         addReplyLongLong(c,ackreplicas);
1858         return;
1859     }
1860 
1861     /* Otherwise block the client and put it into our list of clients
1862      * waiting for ack from slaves. */
1863     c->bpop.timeout = timeout;
1864     c->bpop.reploffset = offset;
1865     c->bpop.numreplicas = numreplicas;
1866     listAddNodeTail(server.clients_waiting_acks,c);
1867     blockClient(c,REDIS_BLOCKED_WAIT);
1868 
1869     /* Make sure that the server will send an ACK request to all the slaves
1870      * before returning to the event loop. */
1871     replicationRequestAckFromSlaves();
1872 }
1873 
1874 /* This is called by unblockClient() to perform the blocking op type
1875  * specific cleanup. We just remove the client from the list of clients
1876  * waiting for replica acks. Never call it directly, call unblockClient()
1877  * instead. */
1878 void unblockClientWaitingReplicas(redisClient *c) {
1879     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
1880     redisAssert(ln != NULL);
1881     listDelNode(server.clients_waiting_acks,ln);
1882 }
1883 
1884 /* Check if there are clients blocked in WAIT that can be unblocked since
1885  * we received enough ACKs from slaves. */
1886 void processClientsWaitingReplicas(void) {
1887     long long last_offset = 0;
1888     int last_numreplicas = 0;
1889 
1890     listIter li;
1891     listNode *ln;
1892 
1893     listRewind(server.clients_waiting_acks,&li);
1894     while((ln = listNext(&li))) {
1895         redisClient *c = ln->value;
1896 
1897         /* Every time we find a client that is satisfied for a given
1898          * offset and number of replicas, we remember it so the next client
1899          * may be unblocked without calling replicationCountAcksByOffset()
1900          * if the requested offset / replicas were equal or less. */
1901         if (last_offset && last_offset > c->bpop.reploffset &&
1902                            last_numreplicas > c->bpop.numreplicas)
1903         {
1904             unblockClient(c);
1905             addReplyLongLong(c,last_numreplicas);
1906         } else {
1907             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
1908 
1909             if (numreplicas >= c->bpop.numreplicas) {
1910                 last_offset = c->bpop.reploffset;
1911                 last_numreplicas = numreplicas;
1912                 unblockClient(c);
1913                 addReplyLongLong(c,numreplicas);
1914             }
1915         }
1916     }
1917 }
1918 
1919 /* Return the slave replication offset for this instance, that is
1920  * the offset for which we already processed the master replication stream. */
1921 long long replicationGetSlaveOffset(void) {
1922     long long offset = 0;
1923 
1924     if (server.masterhost != NULL) {
1925         if (server.master) {
1926             offset = server.master->reploff;
1927         } else if (server.cached_master) {
1928             offset = server.cached_master->reploff;
1929         }
1930     }
1931     /* offset may be -1 when the master does not support it at all, however
1932      * this function is designed to return an offset that can express the
1933      * amount of data processed by the master, so we return a positive
1934      * integer. */
1935     if (offset < 0) offset = 0;
1936     return offset;
1937 }
1938 
1939 /* --------------------------- REPLICATION CRON  ---------------------------- */
1940 
1941 /* Replication cron function, called 1 time per second. */
1942 void replicationCron(void) {
1943     /* Non blocking connection timeout? */
1944     if (server.masterhost &&
1945         (server.repl_state == REDIS_REPL_CONNECTING ||
1946          server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
1947         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1948     {
1949         redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
1950         undoConnectWithMaster();
1951     }
1952 
1953     /* Bulk transfer I/O timeout? */
1954     if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
1955         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1956     {
1957         redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
1958         replicationAbortSyncTransfer();
1959     }
1960 
1961     /* Timed out master when we are an already connected slave? */
1962     if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
1963         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
1964     {
1965         redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
1966         freeClient(server.master);
1967     }
1968 
1969     /* Check if we should connect to a MASTER */
1970     if (server.repl_state == REDIS_REPL_CONNECT) {
1971         redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
1972             server.masterhost, server.masterport);
1973         if (connectWithMaster() == REDIS_OK) {
1974             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
1975         }
1976     }
1977 
1978     /* Send ACK to master from time to time.
1979      * Note that we do not send periodic acks to masters that don't
1980      * support PSYNC and replication offsets. */
1981     if (server.masterhost && server.master &&
1982         !(server.master->flags & REDIS_PRE_PSYNC))
1983         replicationSendAck();
1984 
1985     /* If we have attached slaves, PING them from time to time.
1986      * So slaves can implement an explicit timeout to masters, and will
1987      * be able to detect a link disconnection even if the TCP connection
1988      * will not actually go down. */
1989     if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
1990         listIter li;
1991         listNode *ln;
1992         robj *ping_argv[1];
1993 
1994         /* First, send PING */
1995         ping_argv[0] = createStringObject("PING",4);
1996         replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
1997         decrRefCount(ping_argv[0]);
1998 
1999         /* Second, send a newline to all the slaves in pre-synchronization
2000          * stage, that is, slaves waiting for the master to create the RDB file.
2001          * The newline will be ignored by the slave but will refresh the
2002          * last-io timer preventing a timeout. */
2003         listRewind(server.slaves,&li);
2004         while((ln = listNext(&li))) {
2005             redisClient *slave = ln->value;
2006 
2007             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
2008                 (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END &&
2009                  server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET))
2010             {
2011                 if (write(slave->fd, "\n", 1) == -1) {
2012                     /* Don't worry, it's just a ping. */
2013                 }
2014             }
2015         }
2016     }
2017 
2018     /* Disconnect timedout slaves. */
2019     if (listLength(server.slaves)) {
2020         listIter li;
2021         listNode *ln;
2022 
2023         listRewind(server.slaves,&li);
2024         while((ln = listNext(&li))) {
2025             redisClient *slave = ln->value;
2026 
2027             if (slave->replstate != REDIS_REPL_ONLINE) continue;
2028             if (slave->flags & REDIS_PRE_PSYNC) continue;
2029             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
2030             {
2031                 redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s",
2032                     replicationGetSlaveName(slave));
2033                 freeClient(slave);
2034             }
2035         }
2036     }
2037 
2038     /* If we have no attached slaves and there is a replication backlog
2039      * using memory, free it after some (configured) time. */
2040     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
2041         server.repl_backlog)
2042     {
2043         time_t idle = server.unixtime - server.repl_no_slaves_since;
2044 
2045         if (idle > server.repl_backlog_time_limit) {
2046             freeReplicationBacklog();
2047             redisLog(REDIS_NOTICE,
2048                 "Replication backlog freed after %d seconds "
2049                 "without connected slaves.",
2050                 (int) server.repl_backlog_time_limit);
2051         }
2052     }
2053 
2054     /* If AOF is disabled and we no longer have attached slaves, we can
2055      * free our Replication Script Cache as there is no need to propagate
2056      * EVALSHA at all. */
2057     if (listLength(server.slaves) == 0 &&
2058         server.aof_state == REDIS_AOF_OFF &&
2059         listLength(server.repl_scriptcache_fifo) != 0)
2060     {
2061         replicationScriptCacheFlush();
2062     }
2063 
2064     /* If we are using diskless replication and there are slaves waiting
2065      * in WAIT_BGSAVE_START state, check if enough seconds elapsed and
2066      * start a BGSAVE.
2067      *
2068      * This code is also useful to trigger a BGSAVE if the diskless
2069      * replication was turned off with CONFIG SET, while there were already
2070      * slaves in WAIT_BGSAVE_START state. */
2071     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2072         time_t idle, max_idle = 0;
2073         int slaves_waiting = 0;
2074         listNode *ln;
2075         listIter li;
2076 
2077         listRewind(server.slaves,&li);
2078         while((ln = listNext(&li))) {
2079             redisClient *slave = ln->value;
2080             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
2081                 idle = server.unixtime - slave->lastinteraction;
2082                 if (idle > max_idle) max_idle = idle;
2083                 slaves_waiting++;
2084             }
2085         }
2086 
2087         if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
2088             /* Start a BGSAVE. Usually with socket target, or with disk target
2089              * if there was a recent socket -> disk config change. */
2090             if (startBgsaveForReplication() == REDIS_OK) {
2091                 /* It started! We need to change the state of slaves
2092                  * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
2093                  * the current target is disk. Otherwise it was already done
2094                  * by rdbSaveToSlavesSockets() which is called by
2095                  * startBgsaveForReplication(). */
2096                 listRewind(server.slaves,&li);
2097                 while((ln = listNext(&li))) {
2098                     redisClient *slave = ln->value;
2099                     if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
2100                         slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
2101                 }
2102             }
2103         }
2104     }
2105 
2106     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2107     refreshGoodSlavesCount();
2108 }
2109