xref: /redis-3.2.3/src/replication.c (revision 66e2bdf2)
1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "redis.h"
33 
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 
40 void replicationDiscardCachedMaster(void);
41 void replicationResurrectCachedMaster(int newfd);
42 void replicationSendAck(void);
43 void putSlaveOnline(redisClient *slave);
44 
45 /* --------------------------- Utility functions ---------------------------- */
46 
47 /* Return the pointer to a string representing the slave ip:listening_port
48  * pair. Mostly useful for logging, since we want to log a slave using its
49  * IP address and it's listening port which is more clear for the user, for
50  * example: "Closing connection with slave 10.1.2.3:6380". */
51 char *replicationGetSlaveName(redisClient *c) {
52     static char buf[REDIS_PEER_ID_LEN];
53     char ip[REDIS_IP_STR_LEN];
54 
55     ip[0] = '\0';
56     buf[0] = '\0';
57     if (anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1) {
58         if (c->slave_listening_port)
59             anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
60         else
61             snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
62     } else {
63         snprintf(buf,sizeof(buf),"client id #%llu",
64             (unsigned long long) c->id);
65     }
66     return buf;
67 }
68 
69 /* ---------------------------------- MASTER -------------------------------- */
70 
71 void createReplicationBacklog(void) {
72     redisAssert(server.repl_backlog == NULL);
73     server.repl_backlog = zmalloc(server.repl_backlog_size);
74     server.repl_backlog_histlen = 0;
75     server.repl_backlog_idx = 0;
76     /* When a new backlog buffer is created, we increment the replication
77      * offset by one to make sure we'll not be able to PSYNC with any
78      * previous slave. This is needed because we avoid incrementing the
79      * master_repl_offset if no backlog exists nor slaves are attached. */
80     server.master_repl_offset++;
81 
82     /* We don't have any data inside our buffer, but virtually the first
83      * byte we have is the next byte that will be generated for the
84      * replication stream. */
85     server.repl_backlog_off = server.master_repl_offset+1;
86 }
87 
88 /* This function is called when the user modifies the replication backlog
89  * size at runtime. It is up to the function to both update the
90  * server.repl_backlog_size and to resize the buffer and setup it so that
91  * it contains the same data as the previous one (possibly less data, but
92  * the most recent bytes, or the same data and more free space in case the
93  * buffer is enlarged). */
94 void resizeReplicationBacklog(long long newsize) {
95     if (newsize < REDIS_REPL_BACKLOG_MIN_SIZE)
96         newsize = REDIS_REPL_BACKLOG_MIN_SIZE;
97     if (server.repl_backlog_size == newsize) return;
98 
99     server.repl_backlog_size = newsize;
100     if (server.repl_backlog != NULL) {
101         /* What we actually do is to flush the old buffer and realloc a new
102          * empty one. It will refill with new data incrementally.
103          * The reason is that copying a few gigabytes adds latency and even
104          * worse often we need to alloc additional space before freeing the
105          * old buffer. */
106         zfree(server.repl_backlog);
107         server.repl_backlog = zmalloc(server.repl_backlog_size);
108         server.repl_backlog_histlen = 0;
109         server.repl_backlog_idx = 0;
110         /* Next byte we have is... the next since the buffer is empty. */
111         server.repl_backlog_off = server.master_repl_offset+1;
112     }
113 }
114 
115 void freeReplicationBacklog(void) {
116     redisAssert(listLength(server.slaves) == 0);
117     zfree(server.repl_backlog);
118     server.repl_backlog = NULL;
119 }
120 
121 /* Add data to the replication backlog.
122  * This function also increments the global replication offset stored at
123  * server.master_repl_offset, because there is no case where we want to feed
124  * the backlog without incrementing the buffer. */
125 void feedReplicationBacklog(void *ptr, size_t len) {
126     unsigned char *p = ptr;
127 
128     server.master_repl_offset += len;
129 
130     /* This is a circular buffer, so write as much data we can at every
131      * iteration and rewind the "idx" index if we reach the limit. */
132     while(len) {
133         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
134         if (thislen > len) thislen = len;
135         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
136         server.repl_backlog_idx += thislen;
137         if (server.repl_backlog_idx == server.repl_backlog_size)
138             server.repl_backlog_idx = 0;
139         len -= thislen;
140         p += thislen;
141         server.repl_backlog_histlen += thislen;
142     }
143     if (server.repl_backlog_histlen > server.repl_backlog_size)
144         server.repl_backlog_histlen = server.repl_backlog_size;
145     /* Set the offset of the first byte we have in the backlog. */
146     server.repl_backlog_off = server.master_repl_offset -
147                               server.repl_backlog_histlen + 1;
148 }
149 
150 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
151  * as input. */
152 void feedReplicationBacklogWithObject(robj *o) {
153     char llstr[REDIS_LONGSTR_SIZE];
154     void *p;
155     size_t len;
156 
157     if (o->encoding == REDIS_ENCODING_INT) {
158         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
159         p = llstr;
160     } else {
161         len = sdslen(o->ptr);
162         p = o->ptr;
163     }
164     feedReplicationBacklog(p,len);
165 }
166 
167 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
168     listNode *ln;
169     listIter li;
170     int j, len;
171     char llstr[REDIS_LONGSTR_SIZE];
172 
173     /* If there aren't slaves, and there is no backlog buffer to populate,
174      * we can return ASAP. */
175     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
176 
177     /* We can't have slaves attached and no backlog. */
178     redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
179 
180     /* Send SELECT command to every slave if needed. */
181     if (server.slaveseldb != dictid) {
182         robj *selectcmd;
183 
184         /* For a few DBs we have pre-computed SELECT command. */
185         if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
186             selectcmd = shared.select[dictid];
187         } else {
188             int dictid_len;
189 
190             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
191             selectcmd = createObject(REDIS_STRING,
192                 sdscatprintf(sdsempty(),
193                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
194                 dictid_len, llstr));
195         }
196 
197         /* Add the SELECT command into the backlog. */
198         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
199 
200         /* Send it to slaves. */
201         listRewind(slaves,&li);
202         while((ln = listNext(&li))) {
203             redisClient *slave = ln->value;
204             addReply(slave,selectcmd);
205         }
206 
207         if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)
208             decrRefCount(selectcmd);
209     }
210     server.slaveseldb = dictid;
211 
212     /* Write the command to the replication backlog if any. */
213     if (server.repl_backlog) {
214         char aux[REDIS_LONGSTR_SIZE+3];
215 
216         /* Add the multi bulk reply length. */
217         aux[0] = '*';
218         len = ll2string(aux+1,sizeof(aux)-1,argc);
219         aux[len+1] = '\r';
220         aux[len+2] = '\n';
221         feedReplicationBacklog(aux,len+3);
222 
223         for (j = 0; j < argc; j++) {
224             long objlen = stringObjectLen(argv[j]);
225 
226             /* We need to feed the buffer with the object as a bulk reply
227              * not just as a plain string, so create the $..CRLF payload len
228              * and add the final CRLF */
229             aux[0] = '$';
230             len = ll2string(aux+1,sizeof(aux)-1,objlen);
231             aux[len+1] = '\r';
232             aux[len+2] = '\n';
233             feedReplicationBacklog(aux,len+3);
234             feedReplicationBacklogWithObject(argv[j]);
235             feedReplicationBacklog(aux+len+1,2);
236         }
237     }
238 
239     /* Write the command to every slave. */
240     listRewind(server.slaves,&li);
241     while((ln = listNext(&li))) {
242         redisClient *slave = ln->value;
243 
244         /* Don't feed slaves that are still waiting for BGSAVE to start */
245         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
246 
247         /* Feed slaves that are waiting for the initial SYNC (so these commands
248          * are queued in the output buffer until the initial SYNC completes),
249          * or are already in sync with the master. */
250 
251         /* Add the multi bulk length. */
252         addReplyMultiBulkLen(slave,argc);
253 
254         /* Finally any additional argument that was not stored inside the
255          * static buffer if any (from j to argc). */
256         for (j = 0; j < argc; j++)
257             addReplyBulk(slave,argv[j]);
258     }
259 }
260 
261 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
262     listNode *ln;
263     listIter li;
264     int j;
265     sds cmdrepr = sdsnew("+");
266     robj *cmdobj;
267     struct timeval tv;
268 
269     gettimeofday(&tv,NULL);
270     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
271     if (c->flags & REDIS_LUA_CLIENT) {
272         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
273     } else if (c->flags & REDIS_UNIX_SOCKET) {
274         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
275     } else {
276         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
277     }
278 
279     for (j = 0; j < argc; j++) {
280         if (argv[j]->encoding == REDIS_ENCODING_INT) {
281             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
282         } else {
283             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
284                         sdslen(argv[j]->ptr));
285         }
286         if (j != argc-1)
287             cmdrepr = sdscatlen(cmdrepr," ",1);
288     }
289     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
290     cmdobj = createObject(REDIS_STRING,cmdrepr);
291 
292     listRewind(monitors,&li);
293     while((ln = listNext(&li))) {
294         redisClient *monitor = ln->value;
295         addReply(monitor,cmdobj);
296     }
297     decrRefCount(cmdobj);
298 }
299 
300 /* Feed the slave 'c' with the replication backlog starting from the
301  * specified 'offset' up to the end of the backlog. */
302 long long addReplyReplicationBacklog(redisClient *c, long long offset) {
303     long long j, skip, len;
304 
305     redisLog(REDIS_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
306 
307     if (server.repl_backlog_histlen == 0) {
308         redisLog(REDIS_DEBUG, "[PSYNC] Backlog history len is zero");
309         return 0;
310     }
311 
312     redisLog(REDIS_DEBUG, "[PSYNC] Backlog size: %lld",
313              server.repl_backlog_size);
314     redisLog(REDIS_DEBUG, "[PSYNC] First byte: %lld",
315              server.repl_backlog_off);
316     redisLog(REDIS_DEBUG, "[PSYNC] History len: %lld",
317              server.repl_backlog_histlen);
318     redisLog(REDIS_DEBUG, "[PSYNC] Current index: %lld",
319              server.repl_backlog_idx);
320 
321     /* Compute the amount of bytes we need to discard. */
322     skip = offset - server.repl_backlog_off;
323     redisLog(REDIS_DEBUG, "[PSYNC] Skipping: %lld", skip);
324 
325     /* Point j to the oldest byte, that is actaully our
326      * server.repl_backlog_off byte. */
327     j = (server.repl_backlog_idx +
328         (server.repl_backlog_size-server.repl_backlog_histlen)) %
329         server.repl_backlog_size;
330     redisLog(REDIS_DEBUG, "[PSYNC] Index of first byte: %lld", j);
331 
332     /* Discard the amount of data to seek to the specified 'offset'. */
333     j = (j + skip) % server.repl_backlog_size;
334 
335     /* Feed slave with data. Since it is a circular buffer we have to
336      * split the reply in two parts if we are cross-boundary. */
337     len = server.repl_backlog_histlen - skip;
338     redisLog(REDIS_DEBUG, "[PSYNC] Reply total length: %lld", len);
339     while(len) {
340         long long thislen =
341             ((server.repl_backlog_size - j) < len) ?
342             (server.repl_backlog_size - j) : len;
343 
344         redisLog(REDIS_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
345         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
346         len -= thislen;
347         j = 0;
348     }
349     return server.repl_backlog_histlen - skip;
350 }
351 
352 /* This function handles the PSYNC command from the point of view of a
353  * master receiving a request for partial resynchronization.
354  *
355  * On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
356  * with the usual full resync. */
357 int masterTryPartialResynchronization(redisClient *c) {
358     long long psync_offset, psync_len;
359     char *master_runid = c->argv[1]->ptr;
360     char buf[128];
361     int buflen;
362 
363     /* Is the runid of this master the same advertised by the wannabe slave
364      * via PSYNC? If runid changed this master is a different instance and
365      * there is no way to continue. */
366     if (strcasecmp(master_runid, server.runid)) {
367         /* Run id "?" is used by slaves that want to force a full resync. */
368         if (master_runid[0] != '?') {
369             redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: "
370                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
371                 master_runid, server.runid);
372         } else {
373             redisLog(REDIS_NOTICE,"Full resync requested by slave %s",
374                 replicationGetSlaveName(c));
375         }
376         goto need_full_resync;
377     }
378 
379     /* We still have the data our slave is asking for? */
380     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
381        REDIS_OK) goto need_full_resync;
382     if (!server.repl_backlog ||
383         psync_offset < server.repl_backlog_off ||
384         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
385     {
386         redisLog(REDIS_NOTICE,
387             "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
388         if (psync_offset > server.master_repl_offset) {
389             redisLog(REDIS_WARNING,
390                 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
391         }
392         goto need_full_resync;
393     }
394 
395     /* If we reached this point, we are able to perform a partial resync:
396      * 1) Set client state to make it a slave.
397      * 2) Inform the client we can continue with +CONTINUE
398      * 3) Send the backlog data (from the offset to the end) to the slave. */
399     c->flags |= REDIS_SLAVE;
400     c->replstate = REDIS_REPL_ONLINE;
401     c->repl_ack_time = server.unixtime;
402     c->repl_put_online_on_ack = 0;
403     listAddNodeTail(server.slaves,c);
404     /* We can't use the connection buffers since they are used to accumulate
405      * new commands at this stage. But we are sure the socket send buffer is
406      * empty so this write will never fail actually. */
407     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
408     if (write(c->fd,buf,buflen) != buflen) {
409         freeClientAsync(c);
410         return REDIS_OK;
411     }
412     psync_len = addReplyReplicationBacklog(c,psync_offset);
413     redisLog(REDIS_NOTICE,
414         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
415             replicationGetSlaveName(c),
416             psync_len, psync_offset);
417     /* Note that we don't need to set the selected DB at server.slaveseldb
418      * to -1 to force the master to emit SELECT, since the slave already
419      * has this state from the previous connection with the master. */
420 
421     refreshGoodSlavesCount();
422     return REDIS_OK; /* The caller can return, no full resync needed. */
423 
424 need_full_resync:
425     /* We need a full resync for some reason... notify the client. */
426     psync_offset = server.master_repl_offset;
427     /* Add 1 to psync_offset if it the replication backlog does not exists
428      * as when it will be created later we'll increment the offset by one. */
429     if (server.repl_backlog == NULL) psync_offset++;
430     /* Again, we can't use the connection buffers (see above). */
431     buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
432                       server.runid,psync_offset);
433     if (write(c->fd,buf,buflen) != buflen) {
434         freeClientAsync(c);
435         return REDIS_OK;
436     }
437     return REDIS_ERR;
438 }
439 
440 /* Start a BGSAVE for replication goals, which is, selecting the disk or
441  * socket target depending on the configuration, and making sure that
442  * the script cache is flushed before to start.
443  *
444  * Returns REDIS_OK on success or REDIS_ERR otherwise. */
445 int startBgsaveForReplication(void) {
446     int retval;
447 
448     redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC with target: %s",
449         server.repl_diskless_sync ? "slaves sockets" : "disk");
450 
451     if (server.repl_diskless_sync)
452         retval = rdbSaveToSlavesSockets();
453     else
454         retval = rdbSaveBackground(server.rdb_filename);
455 
456     /* Flush the script cache, since we need that slave differences are
457      * accumulated without requiring slaves to match our cached scripts. */
458     if (retval == REDIS_OK) replicationScriptCacheFlush();
459     return retval;
460 }
461 
462 /* SYNC and PSYNC command implemenation. */
463 void syncCommand(redisClient *c) {
464     /* ignore SYNC if already slave or in monitor mode */
465     if (c->flags & REDIS_SLAVE) return;
466 
467     /* Refuse SYNC requests if we are a slave but the link with our master
468      * is not ok... */
469     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
470         addReplyError(c,"Can't SYNC while not connected with my master");
471         return;
472     }
473 
474     /* SYNC can't be issued when the server has pending data to send to
475      * the client about already issued commands. We need a fresh reply
476      * buffer registering the differences between the BGSAVE and the current
477      * dataset, so that we can copy to other slaves if needed. */
478     if (listLength(c->reply) != 0 || c->bufpos != 0) {
479         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
480         return;
481     }
482 
483     redisLog(REDIS_NOTICE,"Slave %s asks for synchronization",
484         replicationGetSlaveName(c));
485 
486     /* Try a partial resynchronization if this is a PSYNC command.
487      * If it fails, we continue with usual full resynchronization, however
488      * when this happens masterTryPartialResynchronization() already
489      * replied with:
490      *
491      * +FULLRESYNC <runid> <offset>
492      *
493      * So the slave knows the new runid and offset to try a PSYNC later
494      * if the connection with the master is lost. */
495     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
496         if (masterTryPartialResynchronization(c) == REDIS_OK) {
497             server.stat_sync_partial_ok++;
498             return; /* No full resync needed, return. */
499         } else {
500             char *master_runid = c->argv[1]->ptr;
501 
502             /* Increment stats for failed PSYNCs, but only if the
503              * runid is not "?", as this is used by slaves to force a full
504              * resync on purpose when they are not albe to partially
505              * resync. */
506             if (master_runid[0] != '?') server.stat_sync_partial_err++;
507         }
508     } else {
509         /* If a slave uses SYNC, we are dealing with an old implementation
510          * of the replication protocol (like redis-cli --slave). Flag the client
511          * so that we don't expect to receive REPLCONF ACK feedbacks. */
512         c->flags |= REDIS_PRE_PSYNC;
513     }
514 
515     /* Full resynchronization. */
516     server.stat_sync_full++;
517 
518     /* Here we need to check if there is a background saving operation
519      * in progress, or if it is required to start one */
520     if (server.rdb_child_pid != -1 &&
521         server.rdb_child_type == REDIS_RDB_CHILD_TYPE_DISK)
522     {
523         /* Ok a background save is in progress. Let's check if it is a good
524          * one for replication, i.e. if there is another slave that is
525          * registering differences since the server forked to save. */
526         redisClient *slave;
527         listNode *ln;
528         listIter li;
529 
530         listRewind(server.slaves,&li);
531         while((ln = listNext(&li))) {
532             slave = ln->value;
533             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
534         }
535         if (ln) {
536             /* Perfect, the server is already registering differences for
537              * another slave. Set the right state, and copy the buffer. */
538             copyClientOutputBuffer(c,slave);
539             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
540             redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
541         } else {
542             /* No way, we need to wait for the next BGSAVE in order to
543              * register differences. */
544             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
545             redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
546         }
547     } else if (server.rdb_child_pid != -1 &&
548                server.rdb_child_type == REDIS_RDB_CHILD_TYPE_SOCKET)
549     {
550         /* There is an RDB child process but it is writing directly to
551          * children sockets. We need to wait for the next BGSAVE
552          * in order to synchronize. */
553         c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
554         redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
555     } else {
556         if (server.repl_diskless_sync) {
557             /* Diskless replication RDB child is created inside
558              * replicationCron() since we want to delay its start a
559              * few seconds to wait for more slaves to arrive. */
560             c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
561             if (server.repl_diskless_sync_delay)
562                 redisLog(REDIS_NOTICE,"Delay next BGSAVE for SYNC");
563         } else {
564             /* Ok we don't have a BGSAVE in progress, let's start one. */
565             if (startBgsaveForReplication() != REDIS_OK) {
566                 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
567                 addReplyError(c,"Unable to perform background save");
568                 return;
569             }
570             c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
571         }
572     }
573 
574     if (server.repl_disable_tcp_nodelay)
575         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
576     c->repldbfd = -1;
577     c->flags |= REDIS_SLAVE;
578     server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
579     listAddNodeTail(server.slaves,c);
580     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
581         createReplicationBacklog();
582     return;
583 }
584 
585 /* REPLCONF <option> <value> <option> <value> ...
586  * This command is used by a slave in order to configure the replication
587  * process before starting it with the SYNC command.
588  *
589  * Currently the only use of this command is to communicate to the master
590  * what is the listening port of the Slave redis instance, so that the
591  * master can accurately list slaves and their listening ports in
592  * the INFO output.
593  *
594  * In the future the same command can be used in order to configure
595  * the replication to initiate an incremental replication instead of a
596  * full resync. */
597 void replconfCommand(redisClient *c) {
598     int j;
599 
600     if ((c->argc % 2) == 0) {
601         /* Number of arguments must be odd to make sure that every
602          * option has a corresponding value. */
603         addReply(c,shared.syntaxerr);
604         return;
605     }
606 
607     /* Process every option-value pair. */
608     for (j = 1; j < c->argc; j+=2) {
609         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
610             long port;
611 
612             if ((getLongFromObjectOrReply(c,c->argv[j+1],
613                     &port,NULL) != REDIS_OK))
614                 return;
615             c->slave_listening_port = port;
616         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
617             /* REPLCONF ACK is used by slave to inform the master the amount
618              * of replication stream that it processed so far. It is an
619              * internal only command that normal clients should never use. */
620             long long offset;
621 
622             if (!(c->flags & REDIS_SLAVE)) return;
623             if ((getLongLongFromObject(c->argv[j+1], &offset) != REDIS_OK))
624                 return;
625             if (offset > c->repl_ack_off)
626                 c->repl_ack_off = offset;
627             c->repl_ack_time = server.unixtime;
628             /* If this was a diskless replication, we need to really put
629              * the slave online when the first ACK is received (which
630              * confirms slave is online and ready to get more data). */
631             if (c->repl_put_online_on_ack && c->replstate == REDIS_REPL_ONLINE)
632                 putSlaveOnline(c);
633             /* Note: this command does not reply anything! */
634             return;
635         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
636             /* REPLCONF GETACK is used in order to request an ACK ASAP
637              * to the slave. */
638             if (server.masterhost && server.master) replicationSendAck();
639             /* Note: this command does not reply anything! */
640         } else {
641             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
642                 (char*)c->argv[j]->ptr);
643             return;
644         }
645     }
646     addReply(c,shared.ok);
647 }
648 
649 /* This function puts a slave in the online state, and should be called just
650  * after a slave received the RDB file for the initial synchronization, and
651  * we are finally ready to send the incremental stream of commands.
652  *
653  * It does a few things:
654  *
655  * 1) Put the slave in ONLINE state.
656  * 2) Make sure the writable event is re-installed, since calling the SYNC
657  *    command disables it, so that we can accumulate output buffer without
658  *    sending it to the slave.
659  * 3) Update the count of good slaves. */
660 void putSlaveOnline(redisClient *slave) {
661     slave->replstate = REDIS_REPL_ONLINE;
662     slave->repl_put_online_on_ack = 0;
663     slave->repl_ack_time = server.unixtime;
664     if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
665         sendReplyToClient, slave) == AE_ERR) {
666         redisLog(REDIS_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
667         freeClient(slave);
668         return;
669     }
670     refreshGoodSlavesCount();
671     redisLog(REDIS_NOTICE,"Synchronization with slave %s succeeded",
672         replicationGetSlaveName(slave));
673 }
674 
675 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
676     redisClient *slave = privdata;
677     REDIS_NOTUSED(el);
678     REDIS_NOTUSED(mask);
679     char buf[REDIS_IOBUF_LEN];
680     ssize_t nwritten, buflen;
681 
682     /* Before sending the RDB file, we send the preamble as configured by the
683      * replication process. Currently the preamble is just the bulk count of
684      * the file in the form "$<length>\r\n". */
685     if (slave->replpreamble) {
686         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
687         if (nwritten == -1) {
688             redisLog(REDIS_VERBOSE,"Write error sending RDB preamble to slave: %s",
689                 strerror(errno));
690             freeClient(slave);
691             return;
692         }
693         server.stat_net_output_bytes += nwritten;
694         sdsrange(slave->replpreamble,nwritten,-1);
695         if (sdslen(slave->replpreamble) == 0) {
696             sdsfree(slave->replpreamble);
697             slave->replpreamble = NULL;
698             /* fall through sending data. */
699         } else {
700             return;
701         }
702     }
703 
704     /* If the preamble was already transfered, send the RDB bulk data. */
705     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
706     buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
707     if (buflen <= 0) {
708         redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
709             (buflen == 0) ? "premature EOF" : strerror(errno));
710         freeClient(slave);
711         return;
712     }
713     if ((nwritten = write(fd,buf,buflen)) == -1) {
714         if (errno != EAGAIN) {
715             redisLog(REDIS_WARNING,"Write error sending DB to slave: %s",
716                 strerror(errno));
717             freeClient(slave);
718         }
719         return;
720     }
721     slave->repldboff += nwritten;
722     server.stat_net_output_bytes += nwritten;
723     if (slave->repldboff == slave->repldbsize) {
724         close(slave->repldbfd);
725         slave->repldbfd = -1;
726         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
727         putSlaveOnline(slave);
728     }
729 }
730 
731 /* This function is called at the end of every background saving,
732  * or when the replication RDB transfer strategy is modified from
733  * disk to socket or the other way around.
734  *
735  * The goal of this function is to handle slaves waiting for a successful
736  * background saving in order to perform non-blocking synchronization, and
737  * to schedule a new BGSAVE if there are slaves that attached while a
738  * BGSAVE was in progress, but it was not a good one for replication (no
739  * other slave was accumulating differences).
740  *
741  * The argument bgsaveerr is REDIS_OK if the background saving succeeded
742  * otherwise REDIS_ERR is passed to the function.
743  * The 'type' argument is the type of the child that terminated
744  * (if it had a disk or socket target). */
745 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
746     listNode *ln;
747     int startbgsave = 0;
748     listIter li;
749 
750     listRewind(server.slaves,&li);
751     while((ln = listNext(&li))) {
752         redisClient *slave = ln->value;
753 
754         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
755             startbgsave = 1;
756             slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
757         } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
758             struct redis_stat buf;
759 
760             /* If this was an RDB on disk save, we have to prepare to send
761              * the RDB from disk to the slave socket. Otherwise if this was
762              * already an RDB -> Slaves socket transfer, used in the case of
763              * diskless replication, our work is trivial, we can just put
764              * the slave online. */
765             if (type == REDIS_RDB_CHILD_TYPE_SOCKET) {
766                 redisLog(REDIS_NOTICE,
767                     "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
768                         replicationGetSlaveName(slave));
769                 /* Note: we wait for a REPLCONF ACK message from slave in
770                  * order to really put it online (install the write handler
771                  * so that the accumulated data can be transfered). However
772                  * we change the replication state ASAP, since our slave
773                  * is technically online now. */
774                 slave->replstate = REDIS_REPL_ONLINE;
775                 slave->repl_put_online_on_ack = 1;
776             } else {
777                 if (bgsaveerr != REDIS_OK) {
778                     freeClient(slave);
779                     redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
780                     continue;
781                 }
782                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
783                     redis_fstat(slave->repldbfd,&buf) == -1) {
784                     freeClient(slave);
785                     redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
786                     continue;
787                 }
788                 slave->repldboff = 0;
789                 slave->repldbsize = buf.st_size;
790                 slave->replstate = REDIS_REPL_SEND_BULK;
791                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
792                     (unsigned long long) slave->repldbsize);
793 
794                 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
795                 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
796                     freeClient(slave);
797                     continue;
798                 }
799             }
800         }
801     }
802     if (startbgsave) {
803         if (startBgsaveForReplication() != REDIS_OK) {
804             listIter li;
805 
806             listRewind(server.slaves,&li);
807             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
808             while((ln = listNext(&li))) {
809                 redisClient *slave = ln->value;
810 
811                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
812                     freeClient(slave);
813             }
814         }
815     }
816 }
817 
818 /* ----------------------------------- SLAVE -------------------------------- */
819 
820 /* Abort the async download of the bulk dataset while SYNC-ing with master */
821 void replicationAbortSyncTransfer(void) {
822     redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
823 
824     aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
825     close(server.repl_transfer_s);
826     close(server.repl_transfer_fd);
827     unlink(server.repl_transfer_tmpfile);
828     zfree(server.repl_transfer_tmpfile);
829     server.repl_state = REDIS_REPL_CONNECT;
830 }
831 
832 /* Avoid the master to detect the slave is timing out while loading the
833  * RDB file in initial synchronization. We send a single newline character
834  * that is valid protocol but is guaranteed to either be sent entierly or
835  * not, since the byte is indivisible.
836  *
837  * The function is called in two contexts: while we flush the current
838  * data with emptyDb(), and while we load the new data received as an
839  * RDB file from the master. */
840 void replicationSendNewlineToMaster(void) {
841     static time_t newline_sent;
842     if (time(NULL) != newline_sent) {
843         newline_sent = time(NULL);
844         if (write(server.repl_transfer_s,"\n",1) == -1) {
845             /* Pinging back in this stage is best-effort. */
846         }
847     }
848 }
849 
850 /* Callback used by emptyDb() while flushing away old data to load
851  * the new dataset received by the master. */
852 void replicationEmptyDbCallback(void *privdata) {
853     REDIS_NOTUSED(privdata);
854     replicationSendNewlineToMaster();
855 }
856 
857 /* Asynchronously read the SYNC payload we receive from a master */
858 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
859 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
860     char buf[4096];
861     ssize_t nread, readlen;
862     off_t left;
863     REDIS_NOTUSED(el);
864     REDIS_NOTUSED(privdata);
865     REDIS_NOTUSED(mask);
866 
867     /* Static vars used to hold the EOF mark, and the last bytes received
868      * form the server: when they match, we reached the end of the transfer. */
869     static char eofmark[REDIS_RUN_ID_SIZE];
870     static char lastbytes[REDIS_RUN_ID_SIZE];
871     static int usemark = 0;
872 
873     /* If repl_transfer_size == -1 we still have to read the bulk length
874      * from the master reply. */
875     if (server.repl_transfer_size == -1) {
876         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
877             redisLog(REDIS_WARNING,
878                 "I/O error reading bulk count from MASTER: %s",
879                 strerror(errno));
880             goto error;
881         }
882 
883         if (buf[0] == '-') {
884             redisLog(REDIS_WARNING,
885                 "MASTER aborted replication with an error: %s",
886                 buf+1);
887             goto error;
888         } else if (buf[0] == '\0') {
889             /* At this stage just a newline works as a PING in order to take
890              * the connection live. So we refresh our last interaction
891              * timestamp. */
892             server.repl_transfer_lastio = server.unixtime;
893             return;
894         } else if (buf[0] != '$') {
895             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
896             goto error;
897         }
898 
899         /* There are two possible forms for the bulk payload. One is the
900          * usual $<count> bulk format. The other is used for diskless transfers
901          * when the master does not know beforehand the size of the file to
902          * transfer. In the latter case, the following format is used:
903          *
904          * $EOF:<40 bytes delimiter>
905          *
906          * At the end of the file the announced delimiter is transmitted. The
907          * delimiter is long and random enough that the probability of a
908          * collision with the actual file content can be ignored. */
909         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= REDIS_RUN_ID_SIZE) {
910             usemark = 1;
911             memcpy(eofmark,buf+5,REDIS_RUN_ID_SIZE);
912             memset(lastbytes,0,REDIS_RUN_ID_SIZE);
913             /* Set any repl_transfer_size to avoid entering this code path
914              * at the next call. */
915             server.repl_transfer_size = 0;
916             redisLog(REDIS_NOTICE,
917                 "MASTER <-> SLAVE sync: receiving streamed RDB from master");
918         } else {
919             usemark = 0;
920             server.repl_transfer_size = strtol(buf+1,NULL,10);
921             redisLog(REDIS_NOTICE,
922                 "MASTER <-> SLAVE sync: receiving %lld bytes from master",
923                 (long long) server.repl_transfer_size);
924         }
925         return;
926     }
927 
928     /* Read bulk data */
929     if (usemark) {
930         readlen = sizeof(buf);
931     } else {
932         left = server.repl_transfer_size - server.repl_transfer_read;
933         readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
934     }
935 
936     nread = read(fd,buf,readlen);
937     if (nread <= 0) {
938         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
939             (nread == -1) ? strerror(errno) : "connection lost");
940         replicationAbortSyncTransfer();
941         return;
942     }
943     server.stat_net_input_bytes += nread;
944 
945     /* When a mark is used, we want to detect EOF asap in order to avoid
946      * writing the EOF mark into the file... */
947     int eof_reached = 0;
948 
949     if (usemark) {
950         /* Update the last bytes array, and check if it matches our delimiter.*/
951         if (nread >= REDIS_RUN_ID_SIZE) {
952             memcpy(lastbytes,buf+nread-REDIS_RUN_ID_SIZE,REDIS_RUN_ID_SIZE);
953         } else {
954             int rem = REDIS_RUN_ID_SIZE-nread;
955             memmove(lastbytes,lastbytes+nread,rem);
956             memcpy(lastbytes+rem,buf,nread);
957         }
958         if (memcmp(lastbytes,eofmark,REDIS_RUN_ID_SIZE) == 0) eof_reached = 1;
959     }
960 
961     server.repl_transfer_lastio = server.unixtime;
962     if (write(server.repl_transfer_fd,buf,nread) != nread) {
963         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
964         goto error;
965     }
966     server.repl_transfer_read += nread;
967 
968     /* Delete the last 40 bytes from the file if we reached EOF. */
969     if (usemark && eof_reached) {
970         if (ftruncate(server.repl_transfer_fd,
971             server.repl_transfer_read - REDIS_RUN_ID_SIZE) == -1)
972         {
973             redisLog(REDIS_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
974             goto error;
975         }
976     }
977 
978     /* Sync data on disk from time to time, otherwise at the end of the transfer
979      * we may suffer a big delay as the memory buffers are copied into the
980      * actual disk. */
981     if (server.repl_transfer_read >=
982         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
983     {
984         off_t sync_size = server.repl_transfer_read -
985                           server.repl_transfer_last_fsync_off;
986         rdb_fsync_range(server.repl_transfer_fd,
987             server.repl_transfer_last_fsync_off, sync_size);
988         server.repl_transfer_last_fsync_off += sync_size;
989     }
990 
991     /* Check if the transfer is now complete */
992     if (!usemark) {
993         if (server.repl_transfer_read == server.repl_transfer_size)
994             eof_reached = 1;
995     }
996 
997     if (eof_reached) {
998         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
999             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
1000             replicationAbortSyncTransfer();
1001             return;
1002         }
1003         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1004         signalFlushedDb(-1);
1005         emptyDb(replicationEmptyDbCallback);
1006         /* Before loading the DB into memory we need to delete the readable
1007          * handler, otherwise it will get called recursively since
1008          * rdbLoad() will call the event loop to process events from time to
1009          * time for non blocking loading. */
1010         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
1011         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
1012         if (rdbLoad(server.rdb_filename) != REDIS_OK) {
1013             redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1014             replicationAbortSyncTransfer();
1015             return;
1016         }
1017         /* Final setup of the connected slave <- master link */
1018         zfree(server.repl_transfer_tmpfile);
1019         close(server.repl_transfer_fd);
1020         server.master = createClient(server.repl_transfer_s);
1021         server.master->flags |= REDIS_MASTER;
1022         server.master->authenticated = 1;
1023         server.repl_state = REDIS_REPL_CONNECTED;
1024         server.master->reploff = server.repl_master_initial_offset;
1025         memcpy(server.master->replrunid, server.repl_master_runid,
1026             sizeof(server.repl_master_runid));
1027         /* If master offset is set to -1, this master is old and is not
1028          * PSYNC capable, so we flag it accordingly. */
1029         if (server.master->reploff == -1)
1030             server.master->flags |= REDIS_PRE_PSYNC;
1031         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1032         /* Restart the AOF subsystem now that we finished the sync. This
1033          * will trigger an AOF rewrite, and when done will start appending
1034          * to the new file. */
1035         if (server.aof_state != REDIS_AOF_OFF) {
1036             int retry = 10;
1037 
1038             stopAppendOnly();
1039             while (retry-- && startAppendOnly() == REDIS_ERR) {
1040                 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1041                 sleep(1);
1042             }
1043             if (!retry) {
1044                 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1045                 exit(1);
1046             }
1047         }
1048     }
1049 
1050     return;
1051 
1052 error:
1053     replicationAbortSyncTransfer();
1054     return;
1055 }
1056 
1057 /* Send a synchronous command to the master. Used to send AUTH and
1058  * REPLCONF commands before starting the replication with SYNC.
1059  *
1060  * The command returns an sds string representing the result of the
1061  * operation. On error the first byte is a "-".
1062  */
1063 char *sendSynchronousCommand(int fd, ...) {
1064     va_list ap;
1065     sds cmd = sdsempty();
1066     char *arg, buf[256];
1067 
1068     /* Create the command to send to the master, we use simple inline
1069      * protocol for simplicity as currently we only send simple strings. */
1070     va_start(ap,fd);
1071     while(1) {
1072         arg = va_arg(ap, char*);
1073         if (arg == NULL) break;
1074 
1075         if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
1076         cmd = sdscat(cmd,arg);
1077     }
1078     cmd = sdscatlen(cmd,"\r\n",2);
1079 
1080     /* Transfer command to the server. */
1081     if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
1082         sdsfree(cmd);
1083         return sdscatprintf(sdsempty(),"-Writing to master: %s",
1084                 strerror(errno));
1085     }
1086     sdsfree(cmd);
1087 
1088     /* Read the reply from the server. */
1089     if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
1090     {
1091         return sdscatprintf(sdsempty(),"-Reading from master: %s",
1092                 strerror(errno));
1093     }
1094     return sdsnew(buf);
1095 }
1096 
1097 /* Try a partial resynchronization with the master if we are about to reconnect.
1098  * If there is no cached master structure, at least try to issue a
1099  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1100  * command in order to obtain the master run id and the master replication
1101  * global offset.
1102  *
1103  * This function is designed to be called from syncWithMaster(), so the
1104  * following assumptions are made:
1105  *
1106  * 1) We pass the function an already connected socket "fd".
1107  * 2) This function does not close the file descriptor "fd". However in case
1108  *    of successful partial resynchronization, the function will reuse
1109  *    'fd' as file descriptor of the server.master client structure.
1110  *
1111  * The function returns:
1112  *
1113  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
1114  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1115  *                   In this case the master run_id and global replication
1116  *                   offset is saved.
1117  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1118  *                      the caller should fall back to SYNC.
1119  */
1120 
1121 #define PSYNC_CONTINUE 0
1122 #define PSYNC_FULLRESYNC 1
1123 #define PSYNC_NOT_SUPPORTED 2
1124 int slaveTryPartialResynchronization(int fd) {
1125     char *psync_runid;
1126     char psync_offset[32];
1127     sds reply;
1128 
1129     /* Initially set repl_master_initial_offset to -1 to mark the current
1130      * master run_id and offset as not valid. Later if we'll be able to do
1131      * a FULL resync using the PSYNC command we'll set the offset at the
1132      * right value, so that this information will be propagated to the
1133      * client structure representing the master into server.master. */
1134     server.repl_master_initial_offset = -1;
1135 
1136     if (server.cached_master) {
1137         psync_runid = server.cached_master->replrunid;
1138         snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1139         redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
1140     } else {
1141         redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
1142         psync_runid = "?";
1143         memcpy(psync_offset,"-1",3);
1144     }
1145 
1146     /* Issue the PSYNC command */
1147     reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
1148 
1149     if (!strncmp(reply,"+FULLRESYNC",11)) {
1150         char *runid = NULL, *offset = NULL;
1151 
1152         /* FULL RESYNC, parse the reply in order to extract the run id
1153          * and the replication offset. */
1154         runid = strchr(reply,' ');
1155         if (runid) {
1156             runid++;
1157             offset = strchr(runid,' ');
1158             if (offset) offset++;
1159         }
1160         if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
1161             redisLog(REDIS_WARNING,
1162                 "Master replied with wrong +FULLRESYNC syntax.");
1163             /* This is an unexpected condition, actually the +FULLRESYNC
1164              * reply means that the master supports PSYNC, but the reply
1165              * format seems wrong. To stay safe we blank the master
1166              * runid to make sure next PSYNCs will fail. */
1167             memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
1168         } else {
1169             memcpy(server.repl_master_runid, runid, offset-runid-1);
1170             server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
1171             server.repl_master_initial_offset = strtoll(offset,NULL,10);
1172             redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
1173                 server.repl_master_runid,
1174                 server.repl_master_initial_offset);
1175         }
1176         /* We are going to full resync, discard the cached master structure. */
1177         replicationDiscardCachedMaster();
1178         sdsfree(reply);
1179         return PSYNC_FULLRESYNC;
1180     }
1181 
1182     if (!strncmp(reply,"+CONTINUE",9)) {
1183         /* Partial resync was accepted, set the replication state accordingly */
1184         redisLog(REDIS_NOTICE,
1185             "Successful partial resynchronization with master.");
1186         sdsfree(reply);
1187         replicationResurrectCachedMaster(fd);
1188         return PSYNC_CONTINUE;
1189     }
1190 
1191     /* If we reach this point we receied either an error since the master does
1192      * not understand PSYNC, or an unexpected reply from the master.
1193      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
1194 
1195     if (strncmp(reply,"-ERR",4)) {
1196         /* If it's not an error, log the unexpected event. */
1197         redisLog(REDIS_WARNING,
1198             "Unexpected reply to PSYNC from master: %s", reply);
1199     } else {
1200         redisLog(REDIS_NOTICE,
1201             "Master does not support PSYNC or is in "
1202             "error state (reply: %s)", reply);
1203     }
1204     sdsfree(reply);
1205     replicationDiscardCachedMaster();
1206     return PSYNC_NOT_SUPPORTED;
1207 }
1208 
1209 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
1210     char tmpfile[256], *err;
1211     int dfd, maxtries = 5;
1212     int sockerr = 0, psync_result;
1213     socklen_t errlen = sizeof(sockerr);
1214     REDIS_NOTUSED(el);
1215     REDIS_NOTUSED(privdata);
1216     REDIS_NOTUSED(mask);
1217 
1218     /* If this event fired after the user turned the instance into a master
1219      * with SLAVEOF NO ONE we must just return ASAP. */
1220     if (server.repl_state == REDIS_REPL_NONE) {
1221         close(fd);
1222         return;
1223     }
1224 
1225     /* Check for errors in the socket. */
1226     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1227         sockerr = errno;
1228     if (sockerr) {
1229         aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1230         redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
1231             strerror(sockerr));
1232         goto error;
1233     }
1234 
1235     /* If we were connecting, it's time to send a non blocking PING, we want to
1236      * make sure the master is able to reply before going into the actual
1237      * replication process where we have long timeouts in the order of
1238      * seconds (in the meantime the slave would block). */
1239     if (server.repl_state == REDIS_REPL_CONNECTING) {
1240         redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
1241         /* Delete the writable event so that the readable event remains
1242          * registered and we can wait for the PONG reply. */
1243         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
1244         server.repl_state = REDIS_REPL_RECEIVE_PONG;
1245         /* Send the PING, don't check for errors at all, we have the timeout
1246          * that will take care about this. */
1247         syncWrite(fd,"PING\r\n",6,100);
1248         return;
1249     }
1250 
1251     /* Receive the PONG command. */
1252     if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
1253         char buf[1024];
1254 
1255         /* Delete the readable event, we no longer need it now that there is
1256          * the PING reply to read. */
1257         aeDeleteFileEvent(server.el,fd,AE_READABLE);
1258 
1259         /* Read the reply with explicit timeout. */
1260         buf[0] = '\0';
1261         if (syncReadLine(fd,buf,sizeof(buf),
1262             server.repl_syncio_timeout*1000) == -1)
1263         {
1264             redisLog(REDIS_WARNING,
1265                 "I/O error reading PING reply from master: %s",
1266                 strerror(errno));
1267             goto error;
1268         }
1269 
1270         /* We accept only two replies as valid, a positive +PONG reply
1271          * (we just check for "+") or an authentication error.
1272          * Note that older versions of Redis replied with "operation not
1273          * permitted" instead of using a proper error code, so we test
1274          * both. */
1275         if (buf[0] != '+' &&
1276             strncmp(buf,"-NOAUTH",7) != 0 &&
1277             strncmp(buf,"-ERR operation not permitted",28) != 0)
1278         {
1279             redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
1280             goto error;
1281         } else {
1282             redisLog(REDIS_NOTICE,
1283                 "Master replied to PING, replication can continue...");
1284         }
1285     }
1286 
1287     /* AUTH with the master if required. */
1288     if(server.masterauth) {
1289         err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
1290         if (err[0] == '-') {
1291             redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
1292             sdsfree(err);
1293             goto error;
1294         }
1295         sdsfree(err);
1296     }
1297 
1298     /* Set the slave port, so that Master's INFO command can list the
1299      * slave listening port correctly. */
1300     {
1301         sds port = sdsfromlonglong(server.port);
1302         err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
1303                                          NULL);
1304         sdsfree(port);
1305         /* Ignore the error if any, not all the Redis versions support
1306          * REPLCONF listening-port. */
1307         if (err[0] == '-') {
1308             redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
1309         }
1310         sdsfree(err);
1311     }
1312 
1313     /* Try a partial resynchonization. If we don't have a cached master
1314      * slaveTryPartialResynchronization() will at least try to use PSYNC
1315      * to start a full resynchronization so that we get the master run id
1316      * and the global offset, to try a partial resync at the next
1317      * reconnection attempt. */
1318     psync_result = slaveTryPartialResynchronization(fd);
1319     if (psync_result == PSYNC_CONTINUE) {
1320         redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
1321         return;
1322     }
1323 
1324     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
1325      * and the server.repl_master_runid and repl_master_initial_offset are
1326      * already populated. */
1327     if (psync_result == PSYNC_NOT_SUPPORTED) {
1328         redisLog(REDIS_NOTICE,"Retrying with SYNC...");
1329         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
1330             redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
1331                 strerror(errno));
1332             goto error;
1333         }
1334     }
1335 
1336     /* Prepare a suitable temp file for bulk transfer */
1337     while(maxtries--) {
1338         snprintf(tmpfile,256,
1339             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1340         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1341         if (dfd != -1) break;
1342         sleep(1);
1343     }
1344     if (dfd == -1) {
1345         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1346         goto error;
1347     }
1348 
1349     /* Setup the non blocking download of the bulk file. */
1350     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
1351             == AE_ERR)
1352     {
1353         redisLog(REDIS_WARNING,
1354             "Can't create readable event for SYNC: %s (fd=%d)",
1355             strerror(errno),fd);
1356         goto error;
1357     }
1358 
1359     server.repl_state = REDIS_REPL_TRANSFER;
1360     server.repl_transfer_size = -1;
1361     server.repl_transfer_read = 0;
1362     server.repl_transfer_last_fsync_off = 0;
1363     server.repl_transfer_fd = dfd;
1364     server.repl_transfer_lastio = server.unixtime;
1365     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1366     return;
1367 
1368 error:
1369     close(fd);
1370     server.repl_transfer_s = -1;
1371     server.repl_state = REDIS_REPL_CONNECT;
1372     return;
1373 }
1374 
1375 int connectWithMaster(void) {
1376     int fd;
1377 
1378     fd = anetTcpNonBlockBindConnect(NULL,
1379         server.masterhost,server.masterport,REDIS_BIND_ADDR);
1380     if (fd == -1) {
1381         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
1382             strerror(errno));
1383         return REDIS_ERR;
1384     }
1385 
1386     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1387             AE_ERR)
1388     {
1389         close(fd);
1390         redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
1391         return REDIS_ERR;
1392     }
1393 
1394     server.repl_transfer_lastio = server.unixtime;
1395     server.repl_transfer_s = fd;
1396     server.repl_state = REDIS_REPL_CONNECTING;
1397     return REDIS_OK;
1398 }
1399 
1400 /* This function can be called when a non blocking connection is currently
1401  * in progress to undo it. */
1402 void undoConnectWithMaster(void) {
1403     int fd = server.repl_transfer_s;
1404 
1405     redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
1406                 server.repl_state == REDIS_REPL_RECEIVE_PONG);
1407     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1408     close(fd);
1409     server.repl_transfer_s = -1;
1410     server.repl_state = REDIS_REPL_CONNECT;
1411 }
1412 
1413 /* This function aborts a non blocking replication attempt if there is one
1414  * in progress, by canceling the non-blocking connect attempt or
1415  * the initial bulk transfer.
1416  *
1417  * If there was a replication handshake in progress 1 is returned and
1418  * the replication state (server.repl_state) set to REDIS_REPL_CONNECT.
1419  *
1420  * Otherwise zero is returned and no operation is perforemd at all. */
1421 int cancelReplicationHandshake(void) {
1422     if (server.repl_state == REDIS_REPL_TRANSFER) {
1423         replicationAbortSyncTransfer();
1424     } else if (server.repl_state == REDIS_REPL_CONNECTING ||
1425              server.repl_state == REDIS_REPL_RECEIVE_PONG)
1426     {
1427         undoConnectWithMaster();
1428     } else {
1429         return 0;
1430     }
1431     return 1;
1432 }
1433 
1434 /* Set replication to the specified master address and port. */
1435 void replicationSetMaster(char *ip, int port) {
1436     sdsfree(server.masterhost);
1437     server.masterhost = sdsnew(ip);
1438     server.masterport = port;
1439     if (server.master) freeClient(server.master);
1440     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1441     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
1442     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1443     cancelReplicationHandshake();
1444     server.repl_state = REDIS_REPL_CONNECT;
1445     server.master_repl_offset = 0;
1446     server.repl_down_since = 0;
1447 }
1448 
1449 /* Cancel replication, setting the instance as a master itself. */
1450 void replicationUnsetMaster(void) {
1451     if (server.masterhost == NULL) return; /* Nothing to do. */
1452     sdsfree(server.masterhost);
1453     server.masterhost = NULL;
1454     if (server.master) {
1455         if (listLength(server.slaves) == 0) {
1456             /* If this instance is turned into a master and there are no
1457              * slaves, it inherits the replication offset from the master.
1458              * Under certain conditions this makes replicas comparable by
1459              * replication offset to understand what is the most updated. */
1460             server.master_repl_offset = server.master->reploff;
1461             freeReplicationBacklog();
1462         }
1463         freeClient(server.master);
1464     }
1465     replicationDiscardCachedMaster();
1466     cancelReplicationHandshake();
1467     server.repl_state = REDIS_REPL_NONE;
1468 }
1469 
1470 void slaveofCommand(redisClient *c) {
1471     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1472      * configured using the current address of the master node. */
1473     if (server.cluster_enabled) {
1474         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1475         return;
1476     }
1477 
1478     /* The special host/port combination "NO" "ONE" turns the instance
1479      * into a master. Otherwise the new master address is set. */
1480     if (!strcasecmp(c->argv[1]->ptr,"no") &&
1481         !strcasecmp(c->argv[2]->ptr,"one")) {
1482         if (server.masterhost) {
1483             replicationUnsetMaster();
1484             redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
1485         }
1486     } else {
1487         long port;
1488 
1489         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
1490             return;
1491 
1492         /* Check if we are already attached to the specified slave */
1493         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1494             && server.masterport == port) {
1495             redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1496             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1497             return;
1498         }
1499         /* There was no previous master or the user specified a different one,
1500          * we can continue. */
1501         replicationSetMaster(c->argv[1]->ptr, port);
1502         redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
1503             server.masterhost, server.masterport);
1504     }
1505     addReply(c,shared.ok);
1506 }
1507 
1508 /* ROLE command: provide information about the role of the instance
1509  * (master or slave) and additional information related to replication
1510  * in an easy to process format. */
1511 void roleCommand(redisClient *c) {
1512     if (server.masterhost == NULL) {
1513         listIter li;
1514         listNode *ln;
1515         void *mbcount;
1516         int slaves = 0;
1517 
1518         addReplyMultiBulkLen(c,3);
1519         addReplyBulkCBuffer(c,"master",6);
1520         addReplyLongLong(c,server.master_repl_offset);
1521         mbcount = addDeferredMultiBulkLength(c);
1522         listRewind(server.slaves,&li);
1523         while((ln = listNext(&li))) {
1524             redisClient *slave = ln->value;
1525             char ip[REDIS_IP_STR_LEN];
1526 
1527             if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1) continue;
1528             if (slave->replstate != REDIS_REPL_ONLINE) continue;
1529             addReplyMultiBulkLen(c,3);
1530             addReplyBulkCString(c,ip);
1531             addReplyBulkLongLong(c,slave->slave_listening_port);
1532             addReplyBulkLongLong(c,slave->repl_ack_off);
1533             slaves++;
1534         }
1535         setDeferredMultiBulkLength(c,mbcount,slaves);
1536     } else {
1537         char *slavestate = NULL;
1538 
1539         addReplyMultiBulkLen(c,5);
1540         addReplyBulkCBuffer(c,"slave",5);
1541         addReplyBulkCString(c,server.masterhost);
1542         addReplyLongLong(c,server.masterport);
1543         switch(server.repl_state) {
1544         case REDIS_REPL_NONE: slavestate = "none"; break;
1545         case REDIS_REPL_CONNECT: slavestate = "connect"; break;
1546         case REDIS_REPL_CONNECTING: slavestate = "connecting"; break;
1547         case REDIS_REPL_RECEIVE_PONG: /* see next */
1548         case REDIS_REPL_TRANSFER: slavestate = "sync"; break;
1549         case REDIS_REPL_CONNECTED: slavestate = "connected"; break;
1550         default: slavestate = "unknown"; break;
1551         }
1552         addReplyBulkCString(c,slavestate);
1553         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1554     }
1555 }
1556 
1557 /* Send a REPLCONF ACK command to the master to inform it about the current
1558  * processed offset. If we are not connected with a master, the command has
1559  * no effects. */
1560 void replicationSendAck(void) {
1561     redisClient *c = server.master;
1562 
1563     if (c != NULL) {
1564         c->flags |= REDIS_MASTER_FORCE_REPLY;
1565         addReplyMultiBulkLen(c,3);
1566         addReplyBulkCString(c,"REPLCONF");
1567         addReplyBulkCString(c,"ACK");
1568         addReplyBulkLongLong(c,c->reploff);
1569         c->flags &= ~REDIS_MASTER_FORCE_REPLY;
1570     }
1571 }
1572 
1573 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
1574 
1575 /* In order to implement partial synchronization we need to be able to cache
1576  * our master's client structure after a transient disconnection.
1577  * It is cached into server.cached_master and flushed away using the following
1578  * functions. */
1579 
1580 /* This function is called by freeClient() in order to cache the master
1581  * client structure instead of destryoing it. freeClient() will return
1582  * ASAP after this function returns, so every action needed to avoid problems
1583  * with a client that is really "suspended" has to be done by this function.
1584  *
1585  * The other functions that will deal with the cached master are:
1586  *
1587  * replicationDiscardCachedMaster() that will make sure to kill the client
1588  * as for some reason we don't want to use it in the future.
1589  *
1590  * replicationResurrectCachedMaster() that is used after a successful PSYNC
1591  * handshake in order to reactivate the cached master.
1592  */
1593 void replicationCacheMaster(redisClient *c) {
1594     listNode *ln;
1595 
1596     redisAssert(server.master != NULL && server.cached_master == NULL);
1597     redisLog(REDIS_NOTICE,"Caching the disconnected master state.");
1598 
1599     /* Remove from the list of clients, we don't want this client to be
1600      * listed by CLIENT LIST or processed in any way by batch operations. */
1601     ln = listSearchKey(server.clients,c);
1602     redisAssert(ln != NULL);
1603     listDelNode(server.clients,ln);
1604 
1605     /* Save the master. Server.master will be set to null later by
1606      * replicationHandleMasterDisconnection(). */
1607     server.cached_master = server.master;
1608 
1609     /* Remove the event handlers and close the socket. We'll later reuse
1610      * the socket of the new connection with the master during PSYNC. */
1611     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1612     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1613     close(c->fd);
1614 
1615     /* Set fd to -1 so that we can safely call freeClient(c) later. */
1616     c->fd = -1;
1617 
1618     /* Invalidate the Peer ID cache. */
1619     if (c->peerid) {
1620         sdsfree(c->peerid);
1621         c->peerid = NULL;
1622     }
1623 
1624     /* Caching the master happens instead of the actual freeClient() call,
1625      * so make sure to adjust the replication state. This function will
1626      * also set server.master to NULL. */
1627     replicationHandleMasterDisconnection();
1628 }
1629 
1630 /* Free a cached master, called when there are no longer the conditions for
1631  * a partial resync on reconnection. */
1632 void replicationDiscardCachedMaster(void) {
1633     if (server.cached_master == NULL) return;
1634 
1635     redisLog(REDIS_NOTICE,"Discarding previously cached master state.");
1636     server.cached_master->flags &= ~REDIS_MASTER;
1637     freeClient(server.cached_master);
1638     server.cached_master = NULL;
1639 }
1640 
1641 /* Turn the cached master into the current master, using the file descriptor
1642  * passed as argument as the socket for the new master.
1643  *
1644  * This function is called when successfully setup a partial resynchronization
1645  * so the stream of data that we'll receive will start from were this
1646  * master left. */
1647 void replicationResurrectCachedMaster(int newfd) {
1648     server.master = server.cached_master;
1649     server.cached_master = NULL;
1650     server.master->fd = newfd;
1651     server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
1652     server.master->authenticated = 1;
1653     server.master->lastinteraction = server.unixtime;
1654     server.repl_state = REDIS_REPL_CONNECTED;
1655 
1656     /* Re-add to the list of clients. */
1657     listAddNodeTail(server.clients,server.master);
1658     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
1659                           readQueryFromClient, server.master)) {
1660         redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
1661         freeClientAsync(server.master); /* Close ASAP. */
1662     }
1663 
1664     /* We may also need to install the write handler as well if there is
1665      * pending data in the write buffers. */
1666     if (server.master->bufpos || listLength(server.master->reply)) {
1667         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
1668                           sendReplyToClient, server.master)) {
1669             redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
1670             freeClientAsync(server.master); /* Close ASAP. */
1671         }
1672     }
1673 }
1674 
1675 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1676 
1677 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1678  * If the option is active, the server will prevent writes if there are not
1679  * enough connected slaves with the specified lag (or less). */
1680 void refreshGoodSlavesCount(void) {
1681     listIter li;
1682     listNode *ln;
1683     int good = 0;
1684 
1685     if (!server.repl_min_slaves_to_write ||
1686         !server.repl_min_slaves_max_lag) return;
1687 
1688     listRewind(server.slaves,&li);
1689     while((ln = listNext(&li))) {
1690         redisClient *slave = ln->value;
1691         time_t lag = server.unixtime - slave->repl_ack_time;
1692 
1693         if (slave->replstate == REDIS_REPL_ONLINE &&
1694             lag <= server.repl_min_slaves_max_lag) good++;
1695     }
1696     server.repl_good_slaves_count = good;
1697 }
1698 
1699 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
1700  * The goal of this code is to keep track of scripts already sent to every
1701  * connected slave, in order to be able to replicate EVALSHA as it is without
1702  * translating it to EVAL every time it is possible.
1703  *
1704  * We use a capped collection implemented by a hash table for fast lookup
1705  * of scripts we can send as EVALSHA, plus a linked list that is used for
1706  * eviction of the oldest entry when the max number of items is reached.
1707  *
1708  * We don't care about taking a different cache for every different slave
1709  * since to fill the cache again is not very costly, the goal of this code
1710  * is to avoid that the same big script is trasmitted a big number of times
1711  * per second wasting bandwidth and processor speed, but it is not a problem
1712  * if we need to rebuild the cache from scratch from time to time, every used
1713  * script will need to be transmitted a single time to reappear in the cache.
1714  *
1715  * This is how the system works:
1716  *
1717  * 1) Every time a new slave connects, we flush the whole script cache.
1718  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
1719  *    trying to convert EVAL into EVALSHA specifically for slaves.
1720  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
1721  *    corresponding SHA1 of the script into the cache as we are sure every
1722  *    slave knows about the script starting from now.
1723  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
1724  *    and at the same time flush the script cache.
1725  * 5) When the last slave disconnects, flush the cache.
1726  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
1727  *    in the master sometimes.
1728  */
1729 
1730 /* Initialize the script cache, only called at startup. */
1731 void replicationScriptCacheInit(void) {
1732     server.repl_scriptcache_size = 10000;
1733     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
1734     server.repl_scriptcache_fifo = listCreate();
1735 }
1736 
1737 /* Empty the script cache. Should be called every time we are no longer sure
1738  * that every slave knows about all the scripts in our set, or when the
1739  * current AOF "context" is no longer aware of the script. In general we
1740  * should flush the cache:
1741  *
1742  * 1) Every time a new slave reconnects to this master and performs a
1743  *    full SYNC (PSYNC does not require flushing).
1744  * 2) Every time an AOF rewrite is performed.
1745  * 3) Every time we are left without slaves at all, and AOF is off, in order
1746  *    to reclaim otherwise unused memory.
1747  */
1748 void replicationScriptCacheFlush(void) {
1749     dictEmpty(server.repl_scriptcache_dict,NULL);
1750     listRelease(server.repl_scriptcache_fifo);
1751     server.repl_scriptcache_fifo = listCreate();
1752 }
1753 
1754 /* Add an entry into the script cache, if we reach max number of entries the
1755  * oldest is removed from the list. */
1756 void replicationScriptCacheAdd(sds sha1) {
1757     int retval;
1758     sds key = sdsdup(sha1);
1759 
1760     /* Evict oldest. */
1761     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
1762     {
1763         listNode *ln = listLast(server.repl_scriptcache_fifo);
1764         sds oldest = listNodeValue(ln);
1765 
1766         retval = dictDelete(server.repl_scriptcache_dict,oldest);
1767         redisAssert(retval == DICT_OK);
1768         listDelNode(server.repl_scriptcache_fifo,ln);
1769     }
1770 
1771     /* Add current. */
1772     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
1773     listAddNodeHead(server.repl_scriptcache_fifo,key);
1774     redisAssert(retval == DICT_OK);
1775 }
1776 
1777 /* Returns non-zero if the specified entry exists inside the cache, that is,
1778  * if all the slaves are aware of this script SHA1. */
1779 int replicationScriptCacheExists(sds sha1) {
1780     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
1781 }
1782 
1783 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
1784  * Redis synchronous replication design can be summarized in points:
1785  *
1786  * - Redis masters have a global replication offset, used by PSYNC.
1787  * - Master increment the offset every time new commands are sent to slaves.
1788  * - Slaves ping back masters with the offset processed so far.
1789  *
1790  * So synchronous replication adds a new WAIT command in the form:
1791  *
1792  *   WAIT <num_replicas> <milliseconds_timeout>
1793  *
1794  * That returns the number of replicas that processed the query when
1795  * we finally have at least num_replicas, or when the timeout was
1796  * reached.
1797  *
1798  * The command is implemented in this way:
1799  *
1800  * - Every time a client processes a command, we remember the replication
1801  *   offset after sending that command to the slaves.
1802  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
1803  *   The client is blocked at the same time (see blocked.c).
1804  * - Once we receive enough ACKs for a given offset or when the timeout
1805  *   is reached, the WAIT command is unblocked and the reply sent to the
1806  *   client.
1807  */
1808 
1809 /* This just set a flag so that we broadcast a REPLCONF GETACK command
1810  * to all the slaves in the beforeSleep() function. Note that this way
1811  * we "group" all the clients that want to wait for synchronouns replication
1812  * in a given event loop iteration, and send a single GETACK for them all. */
1813 void replicationRequestAckFromSlaves(void) {
1814     server.get_ack_from_slaves = 1;
1815 }
1816 
1817 /* Return the number of slaves that already acknowledged the specified
1818  * replication offset. */
1819 int replicationCountAcksByOffset(long long offset) {
1820     listIter li;
1821     listNode *ln;
1822     int count = 0;
1823 
1824     listRewind(server.slaves,&li);
1825     while((ln = listNext(&li))) {
1826         redisClient *slave = ln->value;
1827 
1828         if (slave->replstate != REDIS_REPL_ONLINE) continue;
1829         if (slave->repl_ack_off >= offset) count++;
1830     }
1831     return count;
1832 }
1833 
1834 /* WAIT for N replicas to acknowledge the processing of our latest
1835  * write command (and all the previous commands). */
1836 void waitCommand(redisClient *c) {
1837     mstime_t timeout;
1838     long numreplicas, ackreplicas;
1839     long long offset = c->woff;
1840 
1841     /* Argument parsing. */
1842     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != REDIS_OK)
1843         return;
1844     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
1845         != REDIS_OK) return;
1846 
1847     /* First try without blocking at all. */
1848     ackreplicas = replicationCountAcksByOffset(c->woff);
1849     if (ackreplicas >= numreplicas || c->flags & REDIS_MULTI) {
1850         addReplyLongLong(c,ackreplicas);
1851         return;
1852     }
1853 
1854     /* Otherwise block the client and put it into our list of clients
1855      * waiting for ack from slaves. */
1856     c->bpop.timeout = timeout;
1857     c->bpop.reploffset = offset;
1858     c->bpop.numreplicas = numreplicas;
1859     listAddNodeTail(server.clients_waiting_acks,c);
1860     blockClient(c,REDIS_BLOCKED_WAIT);
1861 
1862     /* Make sure that the server will send an ACK request to all the slaves
1863      * before returning to the event loop. */
1864     replicationRequestAckFromSlaves();
1865 }
1866 
1867 /* This is called by unblockClient() to perform the blocking op type
1868  * specific cleanup. We just remove the client from the list of clients
1869  * waiting for replica acks. Never call it directly, call unblockClient()
1870  * instead. */
1871 void unblockClientWaitingReplicas(redisClient *c) {
1872     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
1873     redisAssert(ln != NULL);
1874     listDelNode(server.clients_waiting_acks,ln);
1875 }
1876 
1877 /* Check if there are clients blocked in WAIT that can be unblocked since
1878  * we received enough ACKs from slaves. */
1879 void processClientsWaitingReplicas(void) {
1880     long long last_offset = 0;
1881     int last_numreplicas = 0;
1882 
1883     listIter li;
1884     listNode *ln;
1885 
1886     listRewind(server.clients_waiting_acks,&li);
1887     while((ln = listNext(&li))) {
1888         redisClient *c = ln->value;
1889 
1890         /* Every time we find a client that is satisfied for a given
1891          * offset and number of replicas, we remember it so the next client
1892          * may be unblocked without calling replicationCountAcksByOffset()
1893          * if the requested offset / replicas were equal or less. */
1894         if (last_offset && last_offset > c->bpop.reploffset &&
1895                            last_numreplicas > c->bpop.numreplicas)
1896         {
1897             unblockClient(c);
1898             addReplyLongLong(c,last_numreplicas);
1899         } else {
1900             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
1901 
1902             if (numreplicas >= c->bpop.numreplicas) {
1903                 last_offset = c->bpop.reploffset;
1904                 last_numreplicas = numreplicas;
1905                 unblockClient(c);
1906                 addReplyLongLong(c,numreplicas);
1907             }
1908         }
1909     }
1910 }
1911 
1912 /* Return the slave replication offset for this instance, that is
1913  * the offset for which we already processed the master replication stream. */
1914 long long replicationGetSlaveOffset(void) {
1915     long long offset = 0;
1916 
1917     if (server.masterhost != NULL) {
1918         if (server.master) {
1919             offset = server.master->reploff;
1920         } else if (server.cached_master) {
1921             offset = server.cached_master->reploff;
1922         }
1923     }
1924     /* offset may be -1 when the master does not support it at all, however
1925      * this function is designed to return an offset that can express the
1926      * amount of data processed by the master, so we return a positive
1927      * integer. */
1928     if (offset < 0) offset = 0;
1929     return offset;
1930 }
1931 
1932 /* --------------------------- REPLICATION CRON  ---------------------------- */
1933 
1934 /* Replication cron function, called 1 time per second. */
1935 void replicationCron(void) {
1936     /* Non blocking connection timeout? */
1937     if (server.masterhost &&
1938         (server.repl_state == REDIS_REPL_CONNECTING ||
1939          server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
1940         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1941     {
1942         redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
1943         undoConnectWithMaster();
1944     }
1945 
1946     /* Bulk transfer I/O timeout? */
1947     if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
1948         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
1949     {
1950         redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
1951         replicationAbortSyncTransfer();
1952     }
1953 
1954     /* Timed out master when we are an already connected slave? */
1955     if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
1956         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
1957     {
1958         redisLog(REDIS_WARNING,"MASTER timeout: no data nor PING received...");
1959         freeClient(server.master);
1960     }
1961 
1962     /* Check if we should connect to a MASTER */
1963     if (server.repl_state == REDIS_REPL_CONNECT) {
1964         redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
1965             server.masterhost, server.masterport);
1966         if (connectWithMaster() == REDIS_OK) {
1967             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
1968         }
1969     }
1970 
1971     /* Send ACK to master from time to time.
1972      * Note that we do not send periodic acks to masters that don't
1973      * support PSYNC and replication offsets. */
1974     if (server.masterhost && server.master &&
1975         !(server.master->flags & REDIS_PRE_PSYNC))
1976         replicationSendAck();
1977 
1978     /* If we have attached slaves, PING them from time to time.
1979      * So slaves can implement an explicit timeout to masters, and will
1980      * be able to detect a link disconnection even if the TCP connection
1981      * will not actually go down. */
1982     if (!(server.cronloops % (server.repl_ping_slave_period * server.hz))) {
1983         listIter li;
1984         listNode *ln;
1985         robj *ping_argv[1];
1986 
1987         /* First, send PING */
1988         ping_argv[0] = createStringObject("PING",4);
1989         replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
1990         decrRefCount(ping_argv[0]);
1991 
1992         /* Second, send a newline to all the slaves in pre-synchronization
1993          * stage, that is, slaves waiting for the master to create the RDB file.
1994          * The newline will be ignored by the slave but will refresh the
1995          * last-io timer preventing a timeout. */
1996         listRewind(server.slaves,&li);
1997         while((ln = listNext(&li))) {
1998             redisClient *slave = ln->value;
1999 
2000             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START ||
2001                 (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END &&
2002                  server.rdb_child_type != REDIS_RDB_CHILD_TYPE_SOCKET))
2003             {
2004                 if (write(slave->fd, "\n", 1) == -1) {
2005                     /* Don't worry, it's just a ping. */
2006                 }
2007             }
2008         }
2009     }
2010 
2011     /* Disconnect timedout slaves. */
2012     if (listLength(server.slaves)) {
2013         listIter li;
2014         listNode *ln;
2015 
2016         listRewind(server.slaves,&li);
2017         while((ln = listNext(&li))) {
2018             redisClient *slave = ln->value;
2019 
2020             if (slave->replstate != REDIS_REPL_ONLINE) continue;
2021             if (slave->flags & REDIS_PRE_PSYNC) continue;
2022             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
2023             {
2024                 redisLog(REDIS_WARNING, "Disconnecting timedout slave: %s",
2025                     replicationGetSlaveName(slave));
2026                 freeClient(slave);
2027             }
2028         }
2029     }
2030 
2031     /* If we have no attached slaves and there is a replication backlog
2032      * using memory, free it after some (configured) time. */
2033     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
2034         server.repl_backlog)
2035     {
2036         time_t idle = server.unixtime - server.repl_no_slaves_since;
2037 
2038         if (idle > server.repl_backlog_time_limit) {
2039             freeReplicationBacklog();
2040             redisLog(REDIS_NOTICE,
2041                 "Replication backlog freed after %d seconds "
2042                 "without connected slaves.",
2043                 (int) server.repl_backlog_time_limit);
2044         }
2045     }
2046 
2047     /* If AOF is disabled and we no longer have attached slaves, we can
2048      * free our Replication Script Cache as there is no need to propagate
2049      * EVALSHA at all. */
2050     if (listLength(server.slaves) == 0 &&
2051         server.aof_state == REDIS_AOF_OFF &&
2052         listLength(server.repl_scriptcache_fifo) != 0)
2053     {
2054         replicationScriptCacheFlush();
2055     }
2056 
2057     /* If we are using diskless replication and there are slaves waiting
2058      * in WAIT_BGSAVE_START state, check if enough seconds elapsed and
2059      * start a BGSAVE.
2060      *
2061      * This code is also useful to trigger a BGSAVE if the diskless
2062      * replication was turned off with CONFIG SET, while there were already
2063      * slaves in WAIT_BGSAVE_START state. */
2064     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2065         time_t idle, max_idle = 0;
2066         int slaves_waiting = 0;
2067         listNode *ln;
2068         listIter li;
2069 
2070         listRewind(server.slaves,&li);
2071         while((ln = listNext(&li))) {
2072             redisClient *slave = ln->value;
2073             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
2074                 idle = server.unixtime - slave->lastinteraction;
2075                 if (idle > max_idle) max_idle = idle;
2076                 slaves_waiting++;
2077             }
2078         }
2079 
2080         if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
2081             /* Start a BGSAVE. Usually with socket target, or with disk target
2082              * if there was a recent socket -> disk config change. */
2083             if (startBgsaveForReplication() == REDIS_OK) {
2084                 /* It started! We need to change the state of slaves
2085                  * from WAIT_BGSAVE_START to WAIT_BGSAVE_END in case
2086                  * the current target is disk. Otherwise it was already done
2087                  * by rdbSaveToSlavesSockets() which is called by
2088                  * startBgsaveForReplication(). */
2089                 listRewind(server.slaves,&li);
2090                 while((ln = listNext(&li))) {
2091                     redisClient *slave = ln->value;
2092                     if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
2093                         slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
2094                 }
2095             }
2096         }
2097     }
2098 
2099     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2100     refreshGoodSlavesCount();
2101 }
2102