xref: /redis-3.2.3/src/replication.c (revision e67ad1d1)
1 /* Asynchronous replication implementation.
2  *
3  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  *   * Redistributions of source code must retain the above copyright notice,
10  *     this list of conditions and the following disclaimer.
11  *   * Redistributions in binary form must reproduce the above copyright
12  *     notice, this list of conditions and the following disclaimer in the
13  *     documentation and/or other materials provided with the distribution.
14  *   * Neither the name of Redis nor the names of its contributors may be used
15  *     to endorse or promote products derived from this software without
16  *     specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  */
30 
31 
32 #include "server.h"
33 
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 
40 void replicationDiscardCachedMaster(void);
41 void replicationResurrectCachedMaster(int newfd);
42 void replicationSendAck(void);
43 void putSlaveOnline(client *slave);
44 int cancelReplicationHandshake(void);
45 
46 /* --------------------------- Utility functions ---------------------------- */
47 
48 /* Return the pointer to a string representing the slave ip:listening_port
49  * pair. Mostly useful for logging, since we want to log a slave using its
50  * IP address and its listening port which is more clear for the user, for
51  * example: "Closing connection with slave 10.1.2.3:6380". */
replicationGetSlaveName(client * c)52 char *replicationGetSlaveName(client *c) {
53     static char buf[NET_PEER_ID_LEN];
54     char ip[NET_IP_STR_LEN];
55 
56     ip[0] = '\0';
57     buf[0] = '\0';
58     if (c->slave_ip[0] != '\0' ||
59         anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1)
60     {
61         /* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
62         if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));
63 
64         if (c->slave_listening_port)
65             anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
66         else
67             snprintf(buf,sizeof(buf),"%s:<unknown-slave-port>",ip);
68     } else {
69         snprintf(buf,sizeof(buf),"client id #%llu",
70             (unsigned long long) c->id);
71     }
72     return buf;
73 }
74 
75 /* ---------------------------------- MASTER -------------------------------- */
76 
createReplicationBacklog(void)77 void createReplicationBacklog(void) {
78     serverAssert(server.repl_backlog == NULL);
79     server.repl_backlog = zmalloc(server.repl_backlog_size);
80     server.repl_backlog_histlen = 0;
81     server.repl_backlog_idx = 0;
82     /* When a new backlog buffer is created, we increment the replication
83      * offset by one to make sure we'll not be able to PSYNC with any
84      * previous slave. This is needed because we avoid incrementing the
85      * master_repl_offset if no backlog exists nor slaves are attached. */
86     server.master_repl_offset++;
87 
88     /* We don't have any data inside our buffer, but virtually the first
89      * byte we have is the next byte that will be generated for the
90      * replication stream. */
91     server.repl_backlog_off = server.master_repl_offset+1;
92 }
93 
94 /* This function is called when the user modifies the replication backlog
95  * size at runtime. It is up to the function to both update the
96  * server.repl_backlog_size and to resize the buffer and setup it so that
97  * it contains the same data as the previous one (possibly less data, but
98  * the most recent bytes, or the same data and more free space in case the
99  * buffer is enlarged). */
resizeReplicationBacklog(long long newsize)100 void resizeReplicationBacklog(long long newsize) {
101     if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
102         newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
103     if (server.repl_backlog_size == newsize) return;
104 
105     server.repl_backlog_size = newsize;
106     if (server.repl_backlog != NULL) {
107         /* What we actually do is to flush the old buffer and realloc a new
108          * empty one. It will refill with new data incrementally.
109          * The reason is that copying a few gigabytes adds latency and even
110          * worse often we need to alloc additional space before freeing the
111          * old buffer. */
112         zfree(server.repl_backlog);
113         server.repl_backlog = zmalloc(server.repl_backlog_size);
114         server.repl_backlog_histlen = 0;
115         server.repl_backlog_idx = 0;
116         /* Next byte we have is... the next since the buffer is empty. */
117         server.repl_backlog_off = server.master_repl_offset+1;
118     }
119 }
120 
freeReplicationBacklog(void)121 void freeReplicationBacklog(void) {
122     serverAssert(listLength(server.slaves) == 0);
123     zfree(server.repl_backlog);
124     server.repl_backlog = NULL;
125 }
126 
127 /* Add data to the replication backlog.
128  * This function also increments the global replication offset stored at
129  * server.master_repl_offset, because there is no case where we want to feed
130  * the backlog without incrementing the buffer. */
feedReplicationBacklog(void * ptr,size_t len)131 void feedReplicationBacklog(void *ptr, size_t len) {
132     unsigned char *p = ptr;
133 
134     server.master_repl_offset += len;
135 
136     /* This is a circular buffer, so write as much data we can at every
137      * iteration and rewind the "idx" index if we reach the limit. */
138     while(len) {
139         size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
140         if (thislen > len) thislen = len;
141         memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
142         server.repl_backlog_idx += thislen;
143         if (server.repl_backlog_idx == server.repl_backlog_size)
144             server.repl_backlog_idx = 0;
145         len -= thislen;
146         p += thislen;
147         server.repl_backlog_histlen += thislen;
148     }
149     if (server.repl_backlog_histlen > server.repl_backlog_size)
150         server.repl_backlog_histlen = server.repl_backlog_size;
151     /* Set the offset of the first byte we have in the backlog. */
152     server.repl_backlog_off = server.master_repl_offset -
153                               server.repl_backlog_histlen + 1;
154 }
155 
156 /* Wrapper for feedReplicationBacklog() that takes Redis string objects
157  * as input. */
feedReplicationBacklogWithObject(robj * o)158 void feedReplicationBacklogWithObject(robj *o) {
159     char llstr[LONG_STR_SIZE];
160     void *p;
161     size_t len;
162 
163     if (o->encoding == OBJ_ENCODING_INT) {
164         len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
165         p = llstr;
166     } else {
167         len = sdslen(o->ptr);
168         p = o->ptr;
169     }
170     feedReplicationBacklog(p,len);
171 }
172 
replicationFeedSlaves(list * slaves,int dictid,robj ** argv,int argc)173 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
174     listNode *ln;
175     listIter li;
176     int j, len;
177     char llstr[LONG_STR_SIZE];
178 
179     /* If there aren't slaves, and there is no backlog buffer to populate,
180      * we can return ASAP. */
181     if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
182 
183     /* We can't have slaves attached and no backlog. */
184     serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
185 
186     /* Send SELECT command to every slave if needed. */
187     if (server.slaveseldb != dictid) {
188         robj *selectcmd;
189 
190         /* For a few DBs we have pre-computed SELECT command. */
191         if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
192             selectcmd = shared.select[dictid];
193         } else {
194             int dictid_len;
195 
196             dictid_len = ll2string(llstr,sizeof(llstr),dictid);
197             selectcmd = createObject(OBJ_STRING,
198                 sdscatprintf(sdsempty(),
199                 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
200                 dictid_len, llstr));
201         }
202 
203         /* Add the SELECT command into the backlog. */
204         if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
205 
206         /* Send it to slaves. */
207         listRewind(slaves,&li);
208         while((ln = listNext(&li))) {
209             client *slave = ln->value;
210             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
211             addReply(slave,selectcmd);
212         }
213 
214         if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
215             decrRefCount(selectcmd);
216     }
217     server.slaveseldb = dictid;
218 
219     /* Write the command to the replication backlog if any. */
220     if (server.repl_backlog) {
221         char aux[LONG_STR_SIZE+3];
222 
223         /* Add the multi bulk reply length. */
224         aux[0] = '*';
225         len = ll2string(aux+1,sizeof(aux)-1,argc);
226         aux[len+1] = '\r';
227         aux[len+2] = '\n';
228         feedReplicationBacklog(aux,len+3);
229 
230         for (j = 0; j < argc; j++) {
231             long objlen = stringObjectLen(argv[j]);
232 
233             /* We need to feed the buffer with the object as a bulk reply
234              * not just as a plain string, so create the $..CRLF payload len
235              * and add the final CRLF */
236             aux[0] = '$';
237             len = ll2string(aux+1,sizeof(aux)-1,objlen);
238             aux[len+1] = '\r';
239             aux[len+2] = '\n';
240             feedReplicationBacklog(aux,len+3);
241             feedReplicationBacklogWithObject(argv[j]);
242             feedReplicationBacklog(aux+len+1,2);
243         }
244     }
245 
246     /* Write the command to every slave. */
247     listRewind(server.slaves,&li);
248     while((ln = listNext(&li))) {
249         client *slave = ln->value;
250 
251         /* Don't feed slaves that are still waiting for BGSAVE to start */
252         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
253 
254         /* Feed slaves that are waiting for the initial SYNC (so these commands
255          * are queued in the output buffer until the initial SYNC completes),
256          * or are already in sync with the master. */
257 
258         /* Add the multi bulk length. */
259         addReplyMultiBulkLen(slave,argc);
260 
261         /* Finally any additional argument that was not stored inside the
262          * static buffer if any (from j to argc). */
263         for (j = 0; j < argc; j++)
264             addReplyBulk(slave,argv[j]);
265     }
266 }
267 
replicationFeedMonitors(client * c,list * monitors,int dictid,robj ** argv,int argc)268 void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
269     listNode *ln;
270     listIter li;
271     int j;
272     sds cmdrepr = sdsnew("+");
273     robj *cmdobj;
274     struct timeval tv;
275 
276     gettimeofday(&tv,NULL);
277     cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
278     if (c->flags & CLIENT_LUA) {
279         cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
280     } else if (c->flags & CLIENT_UNIX_SOCKET) {
281         cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
282     } else {
283         cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
284     }
285 
286     for (j = 0; j < argc; j++) {
287         if (argv[j]->encoding == OBJ_ENCODING_INT) {
288             cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
289         } else {
290             cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
291                         sdslen(argv[j]->ptr));
292         }
293         if (j != argc-1)
294             cmdrepr = sdscatlen(cmdrepr," ",1);
295     }
296     cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
297     cmdobj = createObject(OBJ_STRING,cmdrepr);
298 
299     listRewind(monitors,&li);
300     while((ln = listNext(&li))) {
301         client *monitor = ln->value;
302         addReply(monitor,cmdobj);
303     }
304     decrRefCount(cmdobj);
305 }
306 
307 /* Feed the slave 'c' with the replication backlog starting from the
308  * specified 'offset' up to the end of the backlog. */
addReplyReplicationBacklog(client * c,long long offset)309 long long addReplyReplicationBacklog(client *c, long long offset) {
310     long long j, skip, len;
311 
312     serverLog(LL_DEBUG, "[PSYNC] Slave request offset: %lld", offset);
313 
314     if (server.repl_backlog_histlen == 0) {
315         serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
316         return 0;
317     }
318 
319     serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
320              server.repl_backlog_size);
321     serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
322              server.repl_backlog_off);
323     serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
324              server.repl_backlog_histlen);
325     serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
326              server.repl_backlog_idx);
327 
328     /* Compute the amount of bytes we need to discard. */
329     skip = offset - server.repl_backlog_off;
330     serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
331 
332     /* Point j to the oldest byte, that is actaully our
333      * server.repl_backlog_off byte. */
334     j = (server.repl_backlog_idx +
335         (server.repl_backlog_size-server.repl_backlog_histlen)) %
336         server.repl_backlog_size;
337     serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
338 
339     /* Discard the amount of data to seek to the specified 'offset'. */
340     j = (j + skip) % server.repl_backlog_size;
341 
342     /* Feed slave with data. Since it is a circular buffer we have to
343      * split the reply in two parts if we are cross-boundary. */
344     len = server.repl_backlog_histlen - skip;
345     serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
346     while(len) {
347         long long thislen =
348             ((server.repl_backlog_size - j) < len) ?
349             (server.repl_backlog_size - j) : len;
350 
351         serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
352         addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
353         len -= thislen;
354         j = 0;
355     }
356     return server.repl_backlog_histlen - skip;
357 }
358 
359 /* Return the offset to provide as reply to the PSYNC command received
360  * from the slave. The returned value is only valid immediately after
361  * the BGSAVE process started and before executing any other command
362  * from clients. */
getPsyncInitialOffset(void)363 long long getPsyncInitialOffset(void) {
364     long long psync_offset = server.master_repl_offset;
365     /* Add 1 to psync_offset if it the replication backlog does not exists
366      * as when it will be created later we'll increment the offset by one. */
367     if (server.repl_backlog == NULL) psync_offset++;
368     return psync_offset;
369 }
370 
371 /* Send a FULLRESYNC reply in the specific case of a full resynchronization,
372  * as a side effect setup the slave for a full sync in different ways:
373  *
374  * 1) Remember, into the slave client structure, the offset we sent
375  *    here, so that if new slaves will later attach to the same
376  *    background RDB saving process (by duplicating this client output
377  *    buffer), we can get the right offset from this slave.
378  * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
379  *    we start accumulating differences from this point.
380  * 3) Force the replication stream to re-emit a SELECT statement so
381  *    the new slave incremental differences will start selecting the
382  *    right database number.
383  *
384  * Normally this function should be called immediately after a successful
385  * BGSAVE for replication was started, or when there is one already in
386  * progress that we attached our slave to. */
replicationSetupSlaveForFullResync(client * slave,long long offset)387 int replicationSetupSlaveForFullResync(client *slave, long long offset) {
388     char buf[128];
389     int buflen;
390 
391     slave->psync_initial_offset = offset;
392     slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
393     /* We are going to accumulate the incremental changes for this
394      * slave as well. Set slaveseldb to -1 in order to force to re-emit
395      * a SLEECT statement in the replication stream. */
396     server.slaveseldb = -1;
397 
398     /* Don't send this reply to slaves that approached us with
399      * the old SYNC command. */
400     if (!(slave->flags & CLIENT_PRE_PSYNC)) {
401         buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
402                           server.runid,offset);
403         if (write(slave->fd,buf,buflen) != buflen) {
404             freeClientAsync(slave);
405             return C_ERR;
406         }
407     }
408     return C_OK;
409 }
410 
411 /* This function handles the PSYNC command from the point of view of a
412  * master receiving a request for partial resynchronization.
413  *
414  * On success return C_OK, otherwise C_ERR is returned and we proceed
415  * with the usual full resync. */
masterTryPartialResynchronization(client * c)416 int masterTryPartialResynchronization(client *c) {
417     long long psync_offset, psync_len;
418     char *master_runid = c->argv[1]->ptr;
419     char buf[128];
420     int buflen;
421 
422     /* Is the runid of this master the same advertised by the wannabe slave
423      * via PSYNC? If runid changed this master is a different instance and
424      * there is no way to continue. */
425     if (strcasecmp(master_runid, server.runid)) {
426         /* Run id "?" is used by slaves that want to force a full resync. */
427         if (master_runid[0] != '?') {
428             serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
429                 "Runid mismatch (Client asked for runid '%s', my runid is '%s')",
430                 master_runid, server.runid);
431         } else {
432             serverLog(LL_NOTICE,"Full resync requested by slave %s",
433                 replicationGetSlaveName(c));
434         }
435         goto need_full_resync;
436     }
437 
438     /* We still have the data our slave is asking for? */
439     if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
440        C_OK) goto need_full_resync;
441     if (!server.repl_backlog ||
442         psync_offset < server.repl_backlog_off ||
443         psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
444     {
445         serverLog(LL_NOTICE,
446             "Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
447         if (psync_offset > server.master_repl_offset) {
448             serverLog(LL_WARNING,
449                 "Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
450         }
451         goto need_full_resync;
452     }
453 
454     /* If we reached this point, we are able to perform a partial resync:
455      * 1) Set client state to make it a slave.
456      * 2) Inform the client we can continue with +CONTINUE
457      * 3) Send the backlog data (from the offset to the end) to the slave. */
458     c->flags |= CLIENT_SLAVE;
459     c->replstate = SLAVE_STATE_ONLINE;
460     c->repl_ack_time = server.unixtime;
461     c->repl_put_online_on_ack = 0;
462     listAddNodeTail(server.slaves,c);
463     /* We can't use the connection buffers since they are used to accumulate
464      * new commands at this stage. But we are sure the socket send buffer is
465      * empty so this write will never fail actually. */
466     buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
467     if (write(c->fd,buf,buflen) != buflen) {
468         freeClientAsync(c);
469         return C_OK;
470     }
471     psync_len = addReplyReplicationBacklog(c,psync_offset);
472     serverLog(LL_NOTICE,
473         "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
474             replicationGetSlaveName(c),
475             psync_len, psync_offset);
476     /* Note that we don't need to set the selected DB at server.slaveseldb
477      * to -1 to force the master to emit SELECT, since the slave already
478      * has this state from the previous connection with the master. */
479 
480     refreshGoodSlavesCount();
481     return C_OK; /* The caller can return, no full resync needed. */
482 
483 need_full_resync:
484     /* We need a full resync for some reason... Note that we can't
485      * reply to PSYNC right now if a full SYNC is needed. The reply
486      * must include the master offset at the time the RDB file we transfer
487      * is generated, so we need to delay the reply to that moment. */
488     return C_ERR;
489 }
490 
491 /* Start a BGSAVE for replication goals, which is, selecting the disk or
492  * socket target depending on the configuration, and making sure that
493  * the script cache is flushed before to start.
494  *
495  * The mincapa argument is the bitwise AND among all the slaves capabilities
496  * of the slaves waiting for this BGSAVE, so represents the slave capabilities
497  * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
498  *
499  * Side effects, other than starting a BGSAVE:
500  *
501  * 1) Handle the slaves in WAIT_START state, by preparing them for a full
502  *    sync if the BGSAVE was succesfully started, or sending them an error
503  *    and dropping them from the list of slaves.
504  *
505  * 2) Flush the Lua scripting script cache if the BGSAVE was actually
506  *    started.
507  *
508  * Returns C_OK on success or C_ERR otherwise. */
startBgsaveForReplication(int mincapa)509 int startBgsaveForReplication(int mincapa) {
510     int retval;
511     int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
512     listIter li;
513     listNode *ln;
514 
515     serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
516         socket_target ? "slaves sockets" : "disk");
517 
518     if (socket_target)
519         retval = rdbSaveToSlavesSockets();
520     else
521         retval = rdbSaveBackground(server.rdb_filename);
522 
523     /* If we failed to BGSAVE, remove the slaves waiting for a full
524      * resynchorinization from the list of salves, inform them with
525      * an error about what happened, close the connection ASAP. */
526     if (retval == C_ERR) {
527         serverLog(LL_WARNING,"BGSAVE for replication failed");
528         listRewind(server.slaves,&li);
529         while((ln = listNext(&li))) {
530             client *slave = ln->value;
531 
532             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
533                 slave->flags &= ~CLIENT_SLAVE;
534                 listDelNode(server.slaves,ln);
535                 addReplyError(slave,
536                     "BGSAVE failed, replication can't continue");
537                 slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
538             }
539         }
540         return retval;
541     }
542 
543     /* If the target is socket, rdbSaveToSlavesSockets() already setup
544      * the salves for a full resync. Otherwise for disk target do it now.*/
545     if (!socket_target) {
546         listRewind(server.slaves,&li);
547         while((ln = listNext(&li))) {
548             client *slave = ln->value;
549 
550             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
551                     replicationSetupSlaveForFullResync(slave,
552                             getPsyncInitialOffset());
553             }
554         }
555     }
556 
557     /* Flush the script cache, since we need that slave differences are
558      * accumulated without requiring slaves to match our cached scripts. */
559     if (retval == C_OK) replicationScriptCacheFlush();
560     return retval;
561 }
562 
563 /* SYNC and PSYNC command implemenation. */
syncCommand(client * c)564 void syncCommand(client *c) {
565     /* ignore SYNC if already slave or in monitor mode */
566     if (c->flags & CLIENT_SLAVE) return;
567 
568     /* Refuse SYNC requests if we are a slave but the link with our master
569      * is not ok... */
570     if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
571         addReplyError(c,"Can't SYNC while not connected with my master");
572         return;
573     }
574 
575     /* SYNC can't be issued when the server has pending data to send to
576      * the client about already issued commands. We need a fresh reply
577      * buffer registering the differences between the BGSAVE and the current
578      * dataset, so that we can copy to other slaves if needed. */
579     if (clientHasPendingReplies(c)) {
580         addReplyError(c,"SYNC and PSYNC are invalid with pending output");
581         return;
582     }
583 
584     serverLog(LL_NOTICE,"Slave %s asks for synchronization",
585         replicationGetSlaveName(c));
586 
587     /* Try a partial resynchronization if this is a PSYNC command.
588      * If it fails, we continue with usual full resynchronization, however
589      * when this happens masterTryPartialResynchronization() already
590      * replied with:
591      *
592      * +FULLRESYNC <runid> <offset>
593      *
594      * So the slave knows the new runid and offset to try a PSYNC later
595      * if the connection with the master is lost. */
596     if (!strcasecmp(c->argv[0]->ptr,"psync")) {
597         if (masterTryPartialResynchronization(c) == C_OK) {
598             server.stat_sync_partial_ok++;
599             return; /* No full resync needed, return. */
600         } else {
601             char *master_runid = c->argv[1]->ptr;
602 
603             /* Increment stats for failed PSYNCs, but only if the
604              * runid is not "?", as this is used by slaves to force a full
605              * resync on purpose when they are not albe to partially
606              * resync. */
607             if (master_runid[0] != '?') server.stat_sync_partial_err++;
608         }
609     } else {
610         /* If a slave uses SYNC, we are dealing with an old implementation
611          * of the replication protocol (like redis-cli --slave). Flag the client
612          * so that we don't expect to receive REPLCONF ACK feedbacks. */
613         c->flags |= CLIENT_PRE_PSYNC;
614     }
615 
616     /* Full resynchronization. */
617     server.stat_sync_full++;
618 
619     /* Setup the slave as one waiting for BGSAVE to start. The following code
620      * paths will change the state if we handle the slave differently. */
621     c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
622     if (server.repl_disable_tcp_nodelay)
623         anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
624     c->repldbfd = -1;
625     c->flags |= CLIENT_SLAVE;
626     listAddNodeTail(server.slaves,c);
627 
628     /* CASE 1: BGSAVE is in progress, with disk target. */
629     if (server.rdb_child_pid != -1 &&
630         server.rdb_child_type == RDB_CHILD_TYPE_DISK)
631     {
632         /* Ok a background save is in progress. Let's check if it is a good
633          * one for replication, i.e. if there is another slave that is
634          * registering differences since the server forked to save. */
635         client *slave;
636         listNode *ln;
637         listIter li;
638 
639         listRewind(server.slaves,&li);
640         while((ln = listNext(&li))) {
641             slave = ln->value;
642             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
643         }
644         /* To attach this slave, we check that it has at least all the
645          * capabilities of the slave that triggered the current BGSAVE. */
646         if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
647             /* Perfect, the server is already registering differences for
648              * another slave. Set the right state, and copy the buffer. */
649             copyClientOutputBuffer(c,slave);
650             replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
651             serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
652         } else {
653             /* No way, we need to wait for the next BGSAVE in order to
654              * register differences. */
655             serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
656         }
657 
658     /* CASE 2: BGSAVE is in progress, with socket target. */
659     } else if (server.rdb_child_pid != -1 &&
660                server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
661     {
662         /* There is an RDB child process but it is writing directly to
663          * children sockets. We need to wait for the next BGSAVE
664          * in order to synchronize. */
665         serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
666 
667     /* CASE 3: There is no BGSAVE is progress. */
668     } else {
669         if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
670             /* Diskless replication RDB child is created inside
671              * replicationCron() since we want to delay its start a
672              * few seconds to wait for more slaves to arrive. */
673             if (server.repl_diskless_sync_delay)
674                 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
675         } else {
676             /* Target is disk (or the slave is not capable of supporting
677              * diskless replication) and we don't have a BGSAVE in progress,
678              * let's start one. */
679             if (server.aof_child_pid == -1) {
680                 startBgsaveForReplication(c->slave_capa);
681             } else {
682                 serverLog(LL_NOTICE,
683                     "No BGSAVE in progress, but an AOF rewrite is active. "
684                     "BGSAVE for replication delayed");
685             }
686         }
687     }
688 
689     if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
690         createReplicationBacklog();
691     return;
692 }
693 
694 /* REPLCONF <option> <value> <option> <value> ...
695  * This command is used by a slave in order to configure the replication
696  * process before starting it with the SYNC command.
697  *
698  * Currently the only use of this command is to communicate to the master
699  * what is the listening port of the Slave redis instance, so that the
700  * master can accurately list slaves and their listening ports in
701  * the INFO output.
702  *
703  * In the future the same command can be used in order to configure
704  * the replication to initiate an incremental replication instead of a
705  * full resync. */
replconfCommand(client * c)706 void replconfCommand(client *c) {
707     int j;
708 
709     if ((c->argc % 2) == 0) {
710         /* Number of arguments must be odd to make sure that every
711          * option has a corresponding value. */
712         addReply(c,shared.syntaxerr);
713         return;
714     }
715 
716     /* Process every option-value pair. */
717     for (j = 1; j < c->argc; j+=2) {
718         if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
719             long port;
720 
721             if ((getLongFromObjectOrReply(c,c->argv[j+1],
722                     &port,NULL) != C_OK))
723                 return;
724             c->slave_listening_port = port;
725         } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
726             sds ip = c->argv[j+1]->ptr;
727             if (sdslen(ip) < sizeof(c->slave_ip)) {
728                 memcpy(c->slave_ip,ip,sdslen(ip)+1);
729             } else {
730                 addReplyErrorFormat(c,"REPLCONF ip-address provided by "
731                     "slave instance is too long: %zd bytes", sdslen(ip));
732                 return;
733             }
734         } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
735             /* Ignore capabilities not understood by this master. */
736             if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
737                 c->slave_capa |= SLAVE_CAPA_EOF;
738         } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
739             /* REPLCONF ACK is used by slave to inform the master the amount
740              * of replication stream that it processed so far. It is an
741              * internal only command that normal clients should never use. */
742             long long offset;
743 
744             if (!(c->flags & CLIENT_SLAVE)) return;
745             if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
746                 return;
747             if (offset > c->repl_ack_off)
748                 c->repl_ack_off = offset;
749             c->repl_ack_time = server.unixtime;
750             /* If this was a diskless replication, we need to really put
751              * the slave online when the first ACK is received (which
752              * confirms slave is online and ready to get more data). */
753             if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
754                 putSlaveOnline(c);
755             /* Note: this command does not reply anything! */
756             return;
757         } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
758             /* REPLCONF GETACK is used in order to request an ACK ASAP
759              * to the slave. */
760             if (server.masterhost && server.master) replicationSendAck();
761             /* Note: this command does not reply anything! */
762         } else {
763             addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
764                 (char*)c->argv[j]->ptr);
765             return;
766         }
767     }
768     addReply(c,shared.ok);
769 }
770 
771 /* This function puts a slave in the online state, and should be called just
772  * after a slave received the RDB file for the initial synchronization, and
773  * we are finally ready to send the incremental stream of commands.
774  *
775  * It does a few things:
776  *
777  * 1) Put the slave in ONLINE state (useless when the function is called
778  *    because state is already ONLINE but repl_put_online_on_ack is true).
779  * 2) Make sure the writable event is re-installed, since calling the SYNC
780  *    command disables it, so that we can accumulate output buffer without
781  *    sending it to the slave.
782  * 3) Update the count of good slaves. */
putSlaveOnline(client * slave)783 void putSlaveOnline(client *slave) {
784     slave->replstate = SLAVE_STATE_ONLINE;
785     slave->repl_put_online_on_ack = 0;
786     slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
787     if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
788         sendReplyToClient, slave) == AE_ERR) {
789         serverLog(LL_WARNING,"Unable to register writable event for slave bulk transfer: %s", strerror(errno));
790         freeClient(slave);
791         return;
792     }
793     refreshGoodSlavesCount();
794     serverLog(LL_NOTICE,"Synchronization with slave %s succeeded",
795         replicationGetSlaveName(slave));
796 }
797 
sendBulkToSlave(aeEventLoop * el,int fd,void * privdata,int mask)798 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
799     client *slave = privdata;
800     UNUSED(el);
801     UNUSED(mask);
802     char buf[PROTO_IOBUF_LEN];
803     ssize_t nwritten, buflen;
804 
805     /* Before sending the RDB file, we send the preamble as configured by the
806      * replication process. Currently the preamble is just the bulk count of
807      * the file in the form "$<length>\r\n". */
808     if (slave->replpreamble) {
809         nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
810         if (nwritten == -1) {
811             serverLog(LL_VERBOSE,"Write error sending RDB preamble to slave: %s",
812                 strerror(errno));
813             freeClient(slave);
814             return;
815         }
816         server.stat_net_output_bytes += nwritten;
817         sdsrange(slave->replpreamble,nwritten,-1);
818         if (sdslen(slave->replpreamble) == 0) {
819             sdsfree(slave->replpreamble);
820             slave->replpreamble = NULL;
821             /* fall through sending data. */
822         } else {
823             return;
824         }
825     }
826 
827     /* If the preamble was already transfered, send the RDB bulk data. */
828     lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
829     buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
830     if (buflen <= 0) {
831         serverLog(LL_WARNING,"Read error sending DB to slave: %s",
832             (buflen == 0) ? "premature EOF" : strerror(errno));
833         freeClient(slave);
834         return;
835     }
836     if ((nwritten = write(fd,buf,buflen)) == -1) {
837         if (errno != EAGAIN) {
838             serverLog(LL_WARNING,"Write error sending DB to slave: %s",
839                 strerror(errno));
840             freeClient(slave);
841         }
842         return;
843     }
844     slave->repldboff += nwritten;
845     server.stat_net_output_bytes += nwritten;
846     if (slave->repldboff == slave->repldbsize) {
847         close(slave->repldbfd);
848         slave->repldbfd = -1;
849         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
850         putSlaveOnline(slave);
851     }
852 }
853 
854 /* This function is called at the end of every background saving,
855  * or when the replication RDB transfer strategy is modified from
856  * disk to socket or the other way around.
857  *
858  * The goal of this function is to handle slaves waiting for a successful
859  * background saving in order to perform non-blocking synchronization, and
860  * to schedule a new BGSAVE if there are slaves that attached while a
861  * BGSAVE was in progress, but it was not a good one for replication (no
862  * other slave was accumulating differences).
863  *
864  * The argument bgsaveerr is C_OK if the background saving succeeded
865  * otherwise C_ERR is passed to the function.
866  * The 'type' argument is the type of the child that terminated
867  * (if it had a disk or socket target). */
updateSlavesWaitingBgsave(int bgsaveerr,int type)868 void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
869     listNode *ln;
870     int startbgsave = 0;
871     int mincapa = -1;
872     listIter li;
873 
874     listRewind(server.slaves,&li);
875     while((ln = listNext(&li))) {
876         client *slave = ln->value;
877 
878         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
879             startbgsave = 1;
880             mincapa = (mincapa == -1) ? slave->slave_capa :
881                                         (mincapa & slave->slave_capa);
882         } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
883             struct redis_stat buf;
884 
885             /* If this was an RDB on disk save, we have to prepare to send
886              * the RDB from disk to the slave socket. Otherwise if this was
887              * already an RDB -> Slaves socket transfer, used in the case of
888              * diskless replication, our work is trivial, we can just put
889              * the slave online. */
890             if (type == RDB_CHILD_TYPE_SOCKET) {
891                 serverLog(LL_NOTICE,
892                     "Streamed RDB transfer with slave %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
893                         replicationGetSlaveName(slave));
894                 /* Note: we wait for a REPLCONF ACK message from slave in
895                  * order to really put it online (install the write handler
896                  * so that the accumulated data can be transfered). However
897                  * we change the replication state ASAP, since our slave
898                  * is technically online now. */
899                 slave->replstate = SLAVE_STATE_ONLINE;
900                 slave->repl_put_online_on_ack = 1;
901                 slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
902             } else {
903                 if (bgsaveerr != C_OK) {
904                     freeClient(slave);
905                     serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
906                     continue;
907                 }
908                 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
909                     redis_fstat(slave->repldbfd,&buf) == -1) {
910                     freeClient(slave);
911                     serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
912                     continue;
913                 }
914                 slave->repldboff = 0;
915                 slave->repldbsize = buf.st_size;
916                 slave->replstate = SLAVE_STATE_SEND_BULK;
917                 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
918                     (unsigned long long) slave->repldbsize);
919 
920                 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
921                 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
922                     freeClient(slave);
923                     continue;
924                 }
925             }
926         }
927     }
928     if (startbgsave) startBgsaveForReplication(mincapa);
929 }
930 
931 /* ----------------------------------- SLAVE -------------------------------- */
932 
933 /* Returns 1 if the given replication state is a handshake state,
934  * 0 otherwise. */
slaveIsInHandshakeState(void)935 int slaveIsInHandshakeState(void) {
936     return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
937            server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
938 }
939 
940 /* Avoid the master to detect the slave is timing out while loading the
941  * RDB file in initial synchronization. We send a single newline character
942  * that is valid protocol but is guaranteed to either be sent entierly or
943  * not, since the byte is indivisible.
944  *
945  * The function is called in two contexts: while we flush the current
946  * data with emptyDb(), and while we load the new data received as an
947  * RDB file from the master. */
replicationSendNewlineToMaster(void)948 void replicationSendNewlineToMaster(void) {
949     static time_t newline_sent;
950     if (time(NULL) != newline_sent) {
951         newline_sent = time(NULL);
952         if (write(server.repl_transfer_s,"\n",1) == -1) {
953             /* Pinging back in this stage is best-effort. */
954         }
955     }
956 }
957 
958 /* Callback used by emptyDb() while flushing away old data to load
959  * the new dataset received by the master. */
replicationEmptyDbCallback(void * privdata)960 void replicationEmptyDbCallback(void *privdata) {
961     UNUSED(privdata);
962     replicationSendNewlineToMaster();
963 }
964 
965 /* Once we have a link with the master and the synchroniziation was
966  * performed, this function materializes the master client we store
967  * at server.master, starting from the specified file descriptor. */
replicationCreateMasterClient(int fd)968 void replicationCreateMasterClient(int fd) {
969     server.master = createClient(fd);
970     server.master->flags |= CLIENT_MASTER;
971     server.master->authenticated = 1;
972     server.repl_state = REPL_STATE_CONNECTED;
973     server.master->reploff = server.repl_master_initial_offset;
974     memcpy(server.master->replrunid, server.repl_master_runid,
975         sizeof(server.repl_master_runid));
976     /* If master offset is set to -1, this master is old and is not
977      * PSYNC capable, so we flag it accordingly. */
978     if (server.master->reploff == -1)
979         server.master->flags |= CLIENT_PRE_PSYNC;
980 }
981 
982 /* Asynchronously read the SYNC payload we receive from a master */
983 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
readSyncBulkPayload(aeEventLoop * el,int fd,void * privdata,int mask)984 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
985     char buf[4096];
986     ssize_t nread, readlen;
987     off_t left;
988     UNUSED(el);
989     UNUSED(privdata);
990     UNUSED(mask);
991 
992     /* Static vars used to hold the EOF mark, and the last bytes received
993      * form the server: when they match, we reached the end of the transfer. */
994     static char eofmark[CONFIG_RUN_ID_SIZE];
995     static char lastbytes[CONFIG_RUN_ID_SIZE];
996     static int usemark = 0;
997 
998     /* If repl_transfer_size == -1 we still have to read the bulk length
999      * from the master reply. */
1000     if (server.repl_transfer_size == -1) {
1001         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
1002             serverLog(LL_WARNING,
1003                 "I/O error reading bulk count from MASTER: %s",
1004                 strerror(errno));
1005             goto error;
1006         }
1007 
1008         if (buf[0] == '-') {
1009             serverLog(LL_WARNING,
1010                 "MASTER aborted replication with an error: %s",
1011                 buf+1);
1012             goto error;
1013         } else if (buf[0] == '\0') {
1014             /* At this stage just a newline works as a PING in order to take
1015              * the connection live. So we refresh our last interaction
1016              * timestamp. */
1017             server.repl_transfer_lastio = server.unixtime;
1018             return;
1019         } else if (buf[0] != '$') {
1020             serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
1021             goto error;
1022         }
1023 
1024         /* There are two possible forms for the bulk payload. One is the
1025          * usual $<count> bulk format. The other is used for diskless transfers
1026          * when the master does not know beforehand the size of the file to
1027          * transfer. In the latter case, the following format is used:
1028          *
1029          * $EOF:<40 bytes delimiter>
1030          *
1031          * At the end of the file the announced delimiter is transmitted. The
1032          * delimiter is long and random enough that the probability of a
1033          * collision with the actual file content can be ignored. */
1034         if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
1035             usemark = 1;
1036             memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
1037             memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
1038             /* Set any repl_transfer_size to avoid entering this code path
1039              * at the next call. */
1040             server.repl_transfer_size = 0;
1041             serverLog(LL_NOTICE,
1042                 "MASTER <-> SLAVE sync: receiving streamed RDB from master");
1043         } else {
1044             usemark = 0;
1045             server.repl_transfer_size = strtol(buf+1,NULL,10);
1046             serverLog(LL_NOTICE,
1047                 "MASTER <-> SLAVE sync: receiving %lld bytes from master",
1048                 (long long) server.repl_transfer_size);
1049         }
1050         return;
1051     }
1052 
1053     /* Read bulk data */
1054     if (usemark) {
1055         readlen = sizeof(buf);
1056     } else {
1057         left = server.repl_transfer_size - server.repl_transfer_read;
1058         readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
1059     }
1060 
1061     nread = read(fd,buf,readlen);
1062     if (nread <= 0) {
1063         serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
1064             (nread == -1) ? strerror(errno) : "connection lost");
1065         cancelReplicationHandshake();
1066         return;
1067     }
1068     server.stat_net_input_bytes += nread;
1069 
1070     /* When a mark is used, we want to detect EOF asap in order to avoid
1071      * writing the EOF mark into the file... */
1072     int eof_reached = 0;
1073 
1074     if (usemark) {
1075         /* Update the last bytes array, and check if it matches our delimiter.*/
1076         if (nread >= CONFIG_RUN_ID_SIZE) {
1077             memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
1078         } else {
1079             int rem = CONFIG_RUN_ID_SIZE-nread;
1080             memmove(lastbytes,lastbytes+nread,rem);
1081             memcpy(lastbytes+rem,buf,nread);
1082         }
1083         if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
1084     }
1085 
1086     server.repl_transfer_lastio = server.unixtime;
1087     if (write(server.repl_transfer_fd,buf,nread) != nread) {
1088         serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
1089         goto error;
1090     }
1091     server.repl_transfer_read += nread;
1092 
1093     /* Delete the last 40 bytes from the file if we reached EOF. */
1094     if (usemark && eof_reached) {
1095         if (ftruncate(server.repl_transfer_fd,
1096             server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
1097         {
1098             serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
1099             goto error;
1100         }
1101     }
1102 
1103     /* Sync data on disk from time to time, otherwise at the end of the transfer
1104      * we may suffer a big delay as the memory buffers are copied into the
1105      * actual disk. */
1106     if (server.repl_transfer_read >=
1107         server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
1108     {
1109         off_t sync_size = server.repl_transfer_read -
1110                           server.repl_transfer_last_fsync_off;
1111         rdb_fsync_range(server.repl_transfer_fd,
1112             server.repl_transfer_last_fsync_off, sync_size);
1113         server.repl_transfer_last_fsync_off += sync_size;
1114     }
1115 
1116     /* Check if the transfer is now complete */
1117     if (!usemark) {
1118         if (server.repl_transfer_read == server.repl_transfer_size)
1119             eof_reached = 1;
1120     }
1121 
1122     if (eof_reached) {
1123         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
1124             serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
1125             cancelReplicationHandshake();
1126             return;
1127         }
1128         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
1129         signalFlushedDb(-1);
1130         emptyDb(replicationEmptyDbCallback);
1131         /* Before loading the DB into memory we need to delete the readable
1132          * handler, otherwise it will get called recursively since
1133          * rdbLoad() will call the event loop to process events from time to
1134          * time for non blocking loading. */
1135         aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
1136         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
1137         if (rdbLoad(server.rdb_filename) != C_OK) {
1138             serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
1139             cancelReplicationHandshake();
1140             return;
1141         }
1142         /* Final setup of the connected slave <- master link */
1143         zfree(server.repl_transfer_tmpfile);
1144         close(server.repl_transfer_fd);
1145         replicationCreateMasterClient(server.repl_transfer_s);
1146         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
1147         /* Restart the AOF subsystem now that we finished the sync. This
1148          * will trigger an AOF rewrite, and when done will start appending
1149          * to the new file. */
1150         if (server.aof_state != AOF_OFF) {
1151             int retry = 10;
1152 
1153             stopAppendOnly();
1154             while (retry-- && startAppendOnly() == C_ERR) {
1155                 serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
1156                 sleep(1);
1157             }
1158             if (!retry) {
1159                 serverLog(LL_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
1160                 exit(1);
1161             }
1162         }
1163     }
1164 
1165     return;
1166 
1167 error:
1168     cancelReplicationHandshake();
1169     return;
1170 }
1171 
1172 /* Send a synchronous command to the master. Used to send AUTH and
1173  * REPLCONF commands before starting the replication with SYNC.
1174  *
1175  * The command returns an sds string representing the result of the
1176  * operation. On error the first byte is a "-".
1177  */
1178 #define SYNC_CMD_READ (1<<0)
1179 #define SYNC_CMD_WRITE (1<<1)
1180 #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
sendSynchronousCommand(int flags,int fd,...)1181 char *sendSynchronousCommand(int flags, int fd, ...) {
1182 
1183     /* Create the command to send to the master, we use simple inline
1184      * protocol for simplicity as currently we only send simple strings. */
1185     if (flags & SYNC_CMD_WRITE) {
1186         char *arg;
1187         va_list ap;
1188         sds cmd = sdsempty();
1189         va_start(ap,fd);
1190 
1191         while(1) {
1192             arg = va_arg(ap, char*);
1193             if (arg == NULL) break;
1194 
1195             if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
1196             cmd = sdscat(cmd,arg);
1197         }
1198         cmd = sdscatlen(cmd,"\r\n",2);
1199 
1200         /* Transfer command to the server. */
1201         if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
1202             == -1)
1203         {
1204             sdsfree(cmd);
1205             return sdscatprintf(sdsempty(),"-Writing to master: %s",
1206                     strerror(errno));
1207         }
1208         sdsfree(cmd);
1209         va_end(ap);
1210     }
1211 
1212     /* Read the reply from the server. */
1213     if (flags & SYNC_CMD_READ) {
1214         char buf[256];
1215 
1216         if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000)
1217             == -1)
1218         {
1219             return sdscatprintf(sdsempty(),"-Reading from master: %s",
1220                     strerror(errno));
1221         }
1222         server.repl_transfer_lastio = server.unixtime;
1223         return sdsnew(buf);
1224     }
1225     return NULL;
1226 }
1227 
1228 /* Try a partial resynchronization with the master if we are about to reconnect.
1229  * If there is no cached master structure, at least try to issue a
1230  * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
1231  * command in order to obtain the master run id and the master replication
1232  * global offset.
1233  *
1234  * This function is designed to be called from syncWithMaster(), so the
1235  * following assumptions are made:
1236  *
1237  * 1) We pass the function an already connected socket "fd".
1238  * 2) This function does not close the file descriptor "fd". However in case
1239  *    of successful partial resynchronization, the function will reuse
1240  *    'fd' as file descriptor of the server.master client structure.
1241  *
1242  * The function is split in two halves: if read_reply is 0, the function
1243  * writes the PSYNC command on the socket, and a new function call is
1244  * needed, with read_reply set to 1, in order to read the reply of the
1245  * command. This is useful in order to support non blocking operations, so
1246  * that we write, return into the event loop, and read when there are data.
1247  *
1248  * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
1249  * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
1250  * with read_reply set to 1. However even when read_reply is set to 1
1251  * the function may return PSYNC_WAIT_REPLY again to signal there were
1252  * insufficient data to read to complete its work. We should re-enter
1253  * into the event loop and wait in such a case.
1254  *
1255  * The function returns:
1256  *
1257  * PSYNC_CONTINUE: If the PSYNC command succeded and we can continue.
1258  * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
1259  *                   In this case the master run_id and global replication
1260  *                   offset is saved.
1261  * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
1262  *                      the caller should fall back to SYNC.
1263  * PSYNC_WRITE_ERR: There was an error writing the command to the socket.
1264  * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
1265  *
1266  * Notable side effects:
1267  *
1268  * 1) As a side effect of the function call the function removes the readable
1269  *    event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
1270  * 2) server.repl_master_initial_offset is set to the right value according
1271  *    to the master reply. This will be used to populate the 'server.master'
1272  *    structure replication offset.
1273  */
1274 
1275 #define PSYNC_WRITE_ERROR 0
1276 #define PSYNC_WAIT_REPLY 1
1277 #define PSYNC_CONTINUE 2
1278 #define PSYNC_FULLRESYNC 3
1279 #define PSYNC_NOT_SUPPORTED 4
slaveTryPartialResynchronization(int fd,int read_reply)1280 int slaveTryPartialResynchronization(int fd, int read_reply) {
1281     char *psync_runid;
1282     char psync_offset[32];
1283     sds reply;
1284 
1285     /* Writing half */
1286     if (!read_reply) {
1287         /* Initially set repl_master_initial_offset to -1 to mark the current
1288          * master run_id and offset as not valid. Later if we'll be able to do
1289          * a FULL resync using the PSYNC command we'll set the offset at the
1290          * right value, so that this information will be propagated to the
1291          * client structure representing the master into server.master. */
1292         server.repl_master_initial_offset = -1;
1293 
1294         if (server.cached_master) {
1295             psync_runid = server.cached_master->replrunid;
1296             snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
1297             serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
1298         } else {
1299             serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
1300             psync_runid = "?";
1301             memcpy(psync_offset,"-1",3);
1302         }
1303 
1304         /* Issue the PSYNC command */
1305         reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
1306         if (reply != NULL) {
1307             serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
1308             sdsfree(reply);
1309             aeDeleteFileEvent(server.el,fd,AE_READABLE);
1310             return PSYNC_WRITE_ERROR;
1311         }
1312         return PSYNC_WAIT_REPLY;
1313     }
1314 
1315     /* Reading half */
1316     reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1317     if (sdslen(reply) == 0) {
1318         /* The master may send empty newlines after it receives PSYNC
1319          * and before to reply, just to keep the connection alive. */
1320         sdsfree(reply);
1321         return PSYNC_WAIT_REPLY;
1322     }
1323 
1324     aeDeleteFileEvent(server.el,fd,AE_READABLE);
1325 
1326     if (!strncmp(reply,"+FULLRESYNC",11)) {
1327         char *runid = NULL, *offset = NULL;
1328 
1329         /* FULL RESYNC, parse the reply in order to extract the run id
1330          * and the replication offset. */
1331         runid = strchr(reply,' ');
1332         if (runid) {
1333             runid++;
1334             offset = strchr(runid,' ');
1335             if (offset) offset++;
1336         }
1337         if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
1338             serverLog(LL_WARNING,
1339                 "Master replied with wrong +FULLRESYNC syntax.");
1340             /* This is an unexpected condition, actually the +FULLRESYNC
1341              * reply means that the master supports PSYNC, but the reply
1342              * format seems wrong. To stay safe we blank the master
1343              * runid to make sure next PSYNCs will fail. */
1344             memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
1345         } else {
1346             memcpy(server.repl_master_runid, runid, offset-runid-1);
1347             server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
1348             server.repl_master_initial_offset = strtoll(offset,NULL,10);
1349             serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
1350                 server.repl_master_runid,
1351                 server.repl_master_initial_offset);
1352         }
1353         /* We are going to full resync, discard the cached master structure. */
1354         replicationDiscardCachedMaster();
1355         sdsfree(reply);
1356         return PSYNC_FULLRESYNC;
1357     }
1358 
1359     if (!strncmp(reply,"+CONTINUE",9)) {
1360         /* Partial resync was accepted, set the replication state accordingly */
1361         serverLog(LL_NOTICE,
1362             "Successful partial resynchronization with master.");
1363         sdsfree(reply);
1364         replicationResurrectCachedMaster(fd);
1365         return PSYNC_CONTINUE;
1366     }
1367 
1368     /* If we reach this point we received either an error since the master does
1369      * not understand PSYNC, or an unexpected reply from the master.
1370      * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
1371 
1372     if (strncmp(reply,"-ERR",4)) {
1373         /* If it's not an error, log the unexpected event. */
1374         serverLog(LL_WARNING,
1375             "Unexpected reply to PSYNC from master: %s", reply);
1376     } else {
1377         serverLog(LL_NOTICE,
1378             "Master does not support PSYNC or is in "
1379             "error state (reply: %s)", reply);
1380     }
1381     sdsfree(reply);
1382     replicationDiscardCachedMaster();
1383     return PSYNC_NOT_SUPPORTED;
1384 }
1385 
syncWithMaster(aeEventLoop * el,int fd,void * privdata,int mask)1386 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
1387     char tmpfile[256], *err = NULL;
1388     int dfd, maxtries = 5;
1389     int sockerr = 0, psync_result;
1390     socklen_t errlen = sizeof(sockerr);
1391     UNUSED(el);
1392     UNUSED(privdata);
1393     UNUSED(mask);
1394 
1395     /* If this event fired after the user turned the instance into a master
1396      * with SLAVEOF NO ONE we must just return ASAP. */
1397     if (server.repl_state == REPL_STATE_NONE) {
1398         close(fd);
1399         return;
1400     }
1401 
1402     /* Check for errors in the socket. */
1403     if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
1404         sockerr = errno;
1405     if (sockerr) {
1406         serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
1407             strerror(sockerr));
1408         goto error;
1409     }
1410 
1411     /* Send a PING to check the master is able to reply without errors. */
1412     if (server.repl_state == REPL_STATE_CONNECTING) {
1413         serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
1414         /* Delete the writable event so that the readable event remains
1415          * registered and we can wait for the PONG reply. */
1416         aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
1417         server.repl_state = REPL_STATE_RECEIVE_PONG;
1418         /* Send the PING, don't check for errors at all, we have the timeout
1419          * that will take care about this. */
1420         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
1421         if (err) goto write_error;
1422         return;
1423     }
1424 
1425     /* Receive the PONG command. */
1426     if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
1427         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1428 
1429         /* We accept only two replies as valid, a positive +PONG reply
1430          * (we just check for "+") or an authentication error.
1431          * Note that older versions of Redis replied with "operation not
1432          * permitted" instead of using a proper error code, so we test
1433          * both. */
1434         if (err[0] != '+' &&
1435             strncmp(err,"-NOAUTH",7) != 0 &&
1436             strncmp(err,"-ERR operation not permitted",28) != 0)
1437         {
1438             serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
1439             sdsfree(err);
1440             goto error;
1441         } else {
1442             serverLog(LL_NOTICE,
1443                 "Master replied to PING, replication can continue...");
1444         }
1445         sdsfree(err);
1446         server.repl_state = REPL_STATE_SEND_AUTH;
1447     }
1448 
1449     /* AUTH with the master if required. */
1450     if (server.repl_state == REPL_STATE_SEND_AUTH) {
1451         if (server.masterauth) {
1452             err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
1453             if (err) goto write_error;
1454             server.repl_state = REPL_STATE_RECEIVE_AUTH;
1455             return;
1456         } else {
1457             server.repl_state = REPL_STATE_SEND_PORT;
1458         }
1459     }
1460 
1461     /* Receive AUTH reply. */
1462     if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
1463         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1464         if (err[0] == '-') {
1465             serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
1466             sdsfree(err);
1467             goto error;
1468         }
1469         sdsfree(err);
1470         server.repl_state = REPL_STATE_SEND_PORT;
1471     }
1472 
1473     /* Set the slave port, so that Master's INFO command can list the
1474      * slave listening port correctly. */
1475     if (server.repl_state == REPL_STATE_SEND_PORT) {
1476         sds port = sdsfromlonglong(server.slave_announce_port ?
1477             server.slave_announce_port : server.port);
1478         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
1479                 "listening-port",port, NULL);
1480         sdsfree(port);
1481         if (err) goto write_error;
1482         sdsfree(err);
1483         server.repl_state = REPL_STATE_RECEIVE_PORT;
1484         return;
1485     }
1486 
1487     /* Receive REPLCONF listening-port reply. */
1488     if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
1489         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1490         /* Ignore the error if any, not all the Redis versions support
1491          * REPLCONF listening-port. */
1492         if (err[0] == '-') {
1493             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
1494                                 "REPLCONF listening-port: %s", err);
1495         }
1496         sdsfree(err);
1497         server.repl_state = REPL_STATE_SEND_IP;
1498     }
1499 
1500     /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
1501     if (server.repl_state == REPL_STATE_SEND_IP &&
1502         server.slave_announce_ip == NULL)
1503     {
1504             server.repl_state = REPL_STATE_SEND_CAPA;
1505     }
1506 
1507     /* Set the slave ip, so that Master's INFO command can list the
1508      * slave IP address port correctly in case of port forwarding or NAT. */
1509     if (server.repl_state == REPL_STATE_SEND_IP) {
1510         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
1511                 "ip-address",server.slave_announce_ip, NULL);
1512         if (err) goto write_error;
1513         sdsfree(err);
1514         server.repl_state = REPL_STATE_RECEIVE_IP;
1515         return;
1516     }
1517 
1518     /* Receive REPLCONF ip-address reply. */
1519     if (server.repl_state == REPL_STATE_RECEIVE_IP) {
1520         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1521         /* Ignore the error if any, not all the Redis versions support
1522          * REPLCONF listening-port. */
1523         if (err[0] == '-') {
1524             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
1525                                 "REPLCONF ip-address: %s", err);
1526         }
1527         sdsfree(err);
1528         server.repl_state = REPL_STATE_SEND_CAPA;
1529     }
1530 
1531     /* Inform the master of our capabilities. While we currently send
1532      * just one capability, it is possible to chain new capabilities here
1533      * in the form of REPLCONF capa X capa Y capa Z ...
1534      * The master will ignore capabilities it does not understand. */
1535     if (server.repl_state == REPL_STATE_SEND_CAPA) {
1536         err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
1537                 "capa","eof",NULL);
1538         if (err) goto write_error;
1539         sdsfree(err);
1540         server.repl_state = REPL_STATE_RECEIVE_CAPA;
1541         return;
1542     }
1543 
1544     /* Receive CAPA reply. */
1545     if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
1546         err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
1547         /* Ignore the error if any, not all the Redis versions support
1548          * REPLCONF capa. */
1549         if (err[0] == '-') {
1550             serverLog(LL_NOTICE,"(Non critical) Master does not understand "
1551                                   "REPLCONF capa: %s", err);
1552         }
1553         sdsfree(err);
1554         server.repl_state = REPL_STATE_SEND_PSYNC;
1555     }
1556 
1557     /* Try a partial resynchonization. If we don't have a cached master
1558      * slaveTryPartialResynchronization() will at least try to use PSYNC
1559      * to start a full resynchronization so that we get the master run id
1560      * and the global offset, to try a partial resync at the next
1561      * reconnection attempt. */
1562     if (server.repl_state == REPL_STATE_SEND_PSYNC) {
1563         if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
1564             err = sdsnew("Write error sending the PSYNC command.");
1565             goto write_error;
1566         }
1567         server.repl_state = REPL_STATE_RECEIVE_PSYNC;
1568         return;
1569     }
1570 
1571     /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
1572     if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
1573         serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
1574                              "state should be RECEIVE_PSYNC but is %d",
1575                              server.repl_state);
1576         goto error;
1577     }
1578 
1579     psync_result = slaveTryPartialResynchronization(fd,1);
1580     if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
1581 
1582     /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
1583      * uninstalling the read handler from the file descriptor. */
1584 
1585     if (psync_result == PSYNC_CONTINUE) {
1586         serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
1587         return;
1588     }
1589 
1590     /* PSYNC failed or is not supported: we want our slaves to resync with us
1591      * as well, if we have any (chained replication case). The mater may
1592      * transfer us an entirely different data set and we have no way to
1593      * incrementally feed our slaves after that. */
1594     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1595     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1596 
1597     /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
1598      * and the server.repl_master_runid and repl_master_initial_offset are
1599      * already populated. */
1600     if (psync_result == PSYNC_NOT_SUPPORTED) {
1601         serverLog(LL_NOTICE,"Retrying with SYNC...");
1602         if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
1603             serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
1604                 strerror(errno));
1605             goto error;
1606         }
1607     }
1608 
1609     /* Prepare a suitable temp file for bulk transfer */
1610     while(maxtries--) {
1611         snprintf(tmpfile,256,
1612             "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
1613         dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
1614         if (dfd != -1) break;
1615         sleep(1);
1616     }
1617     if (dfd == -1) {
1618         serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
1619         goto error;
1620     }
1621 
1622     /* Setup the non blocking download of the bulk file. */
1623     if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
1624             == AE_ERR)
1625     {
1626         serverLog(LL_WARNING,
1627             "Can't create readable event for SYNC: %s (fd=%d)",
1628             strerror(errno),fd);
1629         goto error;
1630     }
1631 
1632     server.repl_state = REPL_STATE_TRANSFER;
1633     server.repl_transfer_size = -1;
1634     server.repl_transfer_read = 0;
1635     server.repl_transfer_last_fsync_off = 0;
1636     server.repl_transfer_fd = dfd;
1637     server.repl_transfer_lastio = server.unixtime;
1638     server.repl_transfer_tmpfile = zstrdup(tmpfile);
1639     return;
1640 
1641 error:
1642     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1643     close(fd);
1644     server.repl_transfer_s = -1;
1645     server.repl_state = REPL_STATE_CONNECT;
1646     return;
1647 
1648 write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */
1649     serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
1650     sdsfree(err);
1651     goto error;
1652 }
1653 
connectWithMaster(void)1654 int connectWithMaster(void) {
1655     int fd;
1656 
1657     fd = anetTcpNonBlockBestEffortBindConnect(NULL,
1658         server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
1659     if (fd == -1) {
1660         serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
1661             strerror(errno));
1662         return C_ERR;
1663     }
1664 
1665     if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
1666             AE_ERR)
1667     {
1668         close(fd);
1669         serverLog(LL_WARNING,"Can't create readable event for SYNC");
1670         return C_ERR;
1671     }
1672 
1673     server.repl_transfer_lastio = server.unixtime;
1674     server.repl_transfer_s = fd;
1675     server.repl_state = REPL_STATE_CONNECTING;
1676     return C_OK;
1677 }
1678 
1679 /* This function can be called when a non blocking connection is currently
1680  * in progress to undo it.
1681  * Never call this function directly, use cancelReplicationHandshake() instead.
1682  */
undoConnectWithMaster(void)1683 void undoConnectWithMaster(void) {
1684     int fd = server.repl_transfer_s;
1685 
1686     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
1687     close(fd);
1688     server.repl_transfer_s = -1;
1689 }
1690 
1691 /* Abort the async download of the bulk dataset while SYNC-ing with master.
1692  * Never call this function directly, use cancelReplicationHandshake() instead.
1693  */
replicationAbortSyncTransfer(void)1694 void replicationAbortSyncTransfer(void) {
1695     serverAssert(server.repl_state == REPL_STATE_TRANSFER);
1696     undoConnectWithMaster();
1697     close(server.repl_transfer_fd);
1698     unlink(server.repl_transfer_tmpfile);
1699     zfree(server.repl_transfer_tmpfile);
1700 }
1701 
1702 /* This function aborts a non blocking replication attempt if there is one
1703  * in progress, by canceling the non-blocking connect attempt or
1704  * the initial bulk transfer.
1705  *
1706  * If there was a replication handshake in progress 1 is returned and
1707  * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
1708  *
1709  * Otherwise zero is returned and no operation is perforemd at all. */
cancelReplicationHandshake(void)1710 int cancelReplicationHandshake(void) {
1711     if (server.repl_state == REPL_STATE_TRANSFER) {
1712         replicationAbortSyncTransfer();
1713         server.repl_state = REPL_STATE_CONNECT;
1714     } else if (server.repl_state == REPL_STATE_CONNECTING ||
1715                slaveIsInHandshakeState())
1716     {
1717         undoConnectWithMaster();
1718         server.repl_state = REPL_STATE_CONNECT;
1719     } else {
1720         return 0;
1721     }
1722     return 1;
1723 }
1724 
1725 /* Set replication to the specified master address and port. */
replicationSetMaster(char * ip,int port)1726 void replicationSetMaster(char *ip, int port) {
1727     sdsfree(server.masterhost);
1728     server.masterhost = sdsnew(ip);
1729     server.masterport = port;
1730     if (server.master) freeClient(server.master);
1731     disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
1732     disconnectSlaves(); /* Force our slaves to resync with us as well. */
1733     replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
1734     freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
1735     cancelReplicationHandshake();
1736     server.repl_state = REPL_STATE_CONNECT;
1737     server.master_repl_offset = 0;
1738     server.repl_down_since = 0;
1739 }
1740 
1741 /* Cancel replication, setting the instance as a master itself. */
replicationUnsetMaster(void)1742 void replicationUnsetMaster(void) {
1743     if (server.masterhost == NULL) return; /* Nothing to do. */
1744     sdsfree(server.masterhost);
1745     server.masterhost = NULL;
1746     if (server.master) {
1747         if (listLength(server.slaves) == 0) {
1748             /* If this instance is turned into a master and there are no
1749              * slaves, it inherits the replication offset from the master.
1750              * Under certain conditions this makes replicas comparable by
1751              * replication offset to understand what is the most updated. */
1752             server.master_repl_offset = server.master->reploff;
1753             freeReplicationBacklog();
1754         }
1755         freeClient(server.master);
1756     }
1757     replicationDiscardCachedMaster();
1758     cancelReplicationHandshake();
1759     server.repl_state = REPL_STATE_NONE;
1760 }
1761 
1762 /* This function is called when the slave lose the connection with the
1763  * master into an unexpected way. */
replicationHandleMasterDisconnection(void)1764 void replicationHandleMasterDisconnection(void) {
1765     server.master = NULL;
1766     server.repl_state = REPL_STATE_CONNECT;
1767     server.repl_down_since = server.unixtime;
1768     /* We lost connection with our master, don't disconnect slaves yet,
1769      * maybe we'll be able to PSYNC with our master later. We'll disconnect
1770      * the slaves only if we'll have to do a full resync with our master. */
1771 }
1772 
slaveofCommand(client * c)1773 void slaveofCommand(client *c) {
1774     /* SLAVEOF is not allowed in cluster mode as replication is automatically
1775      * configured using the current address of the master node. */
1776     if (server.cluster_enabled) {
1777         addReplyError(c,"SLAVEOF not allowed in cluster mode.");
1778         return;
1779     }
1780 
1781     /* The special host/port combination "NO" "ONE" turns the instance
1782      * into a master. Otherwise the new master address is set. */
1783     if (!strcasecmp(c->argv[1]->ptr,"no") &&
1784         !strcasecmp(c->argv[2]->ptr,"one")) {
1785         if (server.masterhost) {
1786             replicationUnsetMaster();
1787             sds client = catClientInfoString(sdsempty(),c);
1788             serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
1789                 client);
1790             sdsfree(client);
1791         }
1792     } else {
1793         long port;
1794 
1795         if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
1796             return;
1797 
1798         /* Check if we are already attached to the specified slave */
1799         if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
1800             && server.masterport == port) {
1801             serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
1802             addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
1803             return;
1804         }
1805         /* There was no previous master or the user specified a different one,
1806          * we can continue. */
1807         replicationSetMaster(c->argv[1]->ptr, port);
1808         sds client = catClientInfoString(sdsempty(),c);
1809         serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
1810             server.masterhost, server.masterport, client);
1811         sdsfree(client);
1812     }
1813     addReply(c,shared.ok);
1814 }
1815 
1816 /* ROLE command: provide information about the role of the instance
1817  * (master or slave) and additional information related to replication
1818  * in an easy to process format. */
roleCommand(client * c)1819 void roleCommand(client *c) {
1820     if (server.masterhost == NULL) {
1821         listIter li;
1822         listNode *ln;
1823         void *mbcount;
1824         int slaves = 0;
1825 
1826         addReplyMultiBulkLen(c,3);
1827         addReplyBulkCBuffer(c,"master",6);
1828         addReplyLongLong(c,server.master_repl_offset);
1829         mbcount = addDeferredMultiBulkLength(c);
1830         listRewind(server.slaves,&li);
1831         while((ln = listNext(&li))) {
1832             client *slave = ln->value;
1833             char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;
1834 
1835             if (slaveip[0] == '\0') {
1836                 if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1)
1837                     continue;
1838                 slaveip = ip;
1839             }
1840             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
1841             addReplyMultiBulkLen(c,3);
1842             addReplyBulkCString(c,slaveip);
1843             addReplyBulkLongLong(c,slave->slave_listening_port);
1844             addReplyBulkLongLong(c,slave->repl_ack_off);
1845             slaves++;
1846         }
1847         setDeferredMultiBulkLength(c,mbcount,slaves);
1848     } else {
1849         char *slavestate = NULL;
1850 
1851         addReplyMultiBulkLen(c,5);
1852         addReplyBulkCBuffer(c,"slave",5);
1853         addReplyBulkCString(c,server.masterhost);
1854         addReplyLongLong(c,server.masterport);
1855         if (slaveIsInHandshakeState()) {
1856             slavestate = "handshake";
1857         } else {
1858             switch(server.repl_state) {
1859             case REPL_STATE_NONE: slavestate = "none"; break;
1860             case REPL_STATE_CONNECT: slavestate = "connect"; break;
1861             case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
1862             case REPL_STATE_TRANSFER: slavestate = "sync"; break;
1863             case REPL_STATE_CONNECTED: slavestate = "connected"; break;
1864             default: slavestate = "unknown"; break;
1865             }
1866         }
1867         addReplyBulkCString(c,slavestate);
1868         addReplyLongLong(c,server.master ? server.master->reploff : -1);
1869     }
1870 }
1871 
1872 /* Send a REPLCONF ACK command to the master to inform it about the current
1873  * processed offset. If we are not connected with a master, the command has
1874  * no effects. */
replicationSendAck(void)1875 void replicationSendAck(void) {
1876     client *c = server.master;
1877 
1878     if (c != NULL) {
1879         c->flags |= CLIENT_MASTER_FORCE_REPLY;
1880         addReplyMultiBulkLen(c,3);
1881         addReplyBulkCString(c,"REPLCONF");
1882         addReplyBulkCString(c,"ACK");
1883         addReplyBulkLongLong(c,c->reploff);
1884         c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
1885     }
1886 }
1887 
1888 /* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
1889 
1890 /* In order to implement partial synchronization we need to be able to cache
1891  * our master's client structure after a transient disconnection.
1892  * It is cached into server.cached_master and flushed away using the following
1893  * functions. */
1894 
1895 /* This function is called by freeClient() in order to cache the master
1896  * client structure instead of destryoing it. freeClient() will return
1897  * ASAP after this function returns, so every action needed to avoid problems
1898  * with a client that is really "suspended" has to be done by this function.
1899  *
1900  * The other functions that will deal with the cached master are:
1901  *
1902  * replicationDiscardCachedMaster() that will make sure to kill the client
1903  * as for some reason we don't want to use it in the future.
1904  *
1905  * replicationResurrectCachedMaster() that is used after a successful PSYNC
1906  * handshake in order to reactivate the cached master.
1907  */
replicationCacheMaster(client * c)1908 void replicationCacheMaster(client *c) {
1909     serverAssert(server.master != NULL && server.cached_master == NULL);
1910     serverLog(LL_NOTICE,"Caching the disconnected master state.");
1911 
1912     /* Unlink the client from the server structures. */
1913     unlinkClient(c);
1914 
1915     /* Save the master. Server.master will be set to null later by
1916      * replicationHandleMasterDisconnection(). */
1917     server.cached_master = server.master;
1918 
1919     /* Invalidate the Peer ID cache. */
1920     if (c->peerid) {
1921         sdsfree(c->peerid);
1922         c->peerid = NULL;
1923     }
1924 
1925     /* Caching the master happens instead of the actual freeClient() call,
1926      * so make sure to adjust the replication state. This function will
1927      * also set server.master to NULL. */
1928     replicationHandleMasterDisconnection();
1929 }
1930 
1931 /* Free a cached master, called when there are no longer the conditions for
1932  * a partial resync on reconnection. */
replicationDiscardCachedMaster(void)1933 void replicationDiscardCachedMaster(void) {
1934     if (server.cached_master == NULL) return;
1935 
1936     serverLog(LL_NOTICE,"Discarding previously cached master state.");
1937     server.cached_master->flags &= ~CLIENT_MASTER;
1938     freeClient(server.cached_master);
1939     server.cached_master = NULL;
1940 }
1941 
1942 /* Turn the cached master into the current master, using the file descriptor
1943  * passed as argument as the socket for the new master.
1944  *
1945  * This function is called when successfully setup a partial resynchronization
1946  * so the stream of data that we'll receive will start from were this
1947  * master left. */
replicationResurrectCachedMaster(int newfd)1948 void replicationResurrectCachedMaster(int newfd) {
1949     server.master = server.cached_master;
1950     server.cached_master = NULL;
1951     server.master->fd = newfd;
1952     server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
1953     server.master->authenticated = 1;
1954     server.master->lastinteraction = server.unixtime;
1955     server.repl_state = REPL_STATE_CONNECTED;
1956 
1957     /* Re-add to the list of clients. */
1958     listAddNodeTail(server.clients,server.master);
1959     if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
1960                           readQueryFromClient, server.master)) {
1961         serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
1962         freeClientAsync(server.master); /* Close ASAP. */
1963     }
1964 
1965     /* We may also need to install the write handler as well if there is
1966      * pending data in the write buffers. */
1967     if (clientHasPendingReplies(server.master)) {
1968         if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
1969                           sendReplyToClient, server.master)) {
1970             serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
1971             freeClientAsync(server.master); /* Close ASAP. */
1972         }
1973     }
1974 }
1975 
1976 /* ------------------------- MIN-SLAVES-TO-WRITE  --------------------------- */
1977 
1978 /* This function counts the number of slaves with lag <= min-slaves-max-lag.
1979  * If the option is active, the server will prevent writes if there are not
1980  * enough connected slaves with the specified lag (or less). */
refreshGoodSlavesCount(void)1981 void refreshGoodSlavesCount(void) {
1982     listIter li;
1983     listNode *ln;
1984     int good = 0;
1985 
1986     if (!server.repl_min_slaves_to_write ||
1987         !server.repl_min_slaves_max_lag) return;
1988 
1989     listRewind(server.slaves,&li);
1990     while((ln = listNext(&li))) {
1991         client *slave = ln->value;
1992         time_t lag = server.unixtime - slave->repl_ack_time;
1993 
1994         if (slave->replstate == SLAVE_STATE_ONLINE &&
1995             lag <= server.repl_min_slaves_max_lag) good++;
1996     }
1997     server.repl_good_slaves_count = good;
1998 }
1999 
2000 /* ----------------------- REPLICATION SCRIPT CACHE --------------------------
2001  * The goal of this code is to keep track of scripts already sent to every
2002  * connected slave, in order to be able to replicate EVALSHA as it is without
2003  * translating it to EVAL every time it is possible.
2004  *
2005  * We use a capped collection implemented by a hash table for fast lookup
2006  * of scripts we can send as EVALSHA, plus a linked list that is used for
2007  * eviction of the oldest entry when the max number of items is reached.
2008  *
2009  * We don't care about taking a different cache for every different slave
2010  * since to fill the cache again is not very costly, the goal of this code
2011  * is to avoid that the same big script is trasmitted a big number of times
2012  * per second wasting bandwidth and processor speed, but it is not a problem
2013  * if we need to rebuild the cache from scratch from time to time, every used
2014  * script will need to be transmitted a single time to reappear in the cache.
2015  *
2016  * This is how the system works:
2017  *
2018  * 1) Every time a new slave connects, we flush the whole script cache.
2019  * 2) We only send as EVALSHA what was sent to the master as EVALSHA, without
2020  *    trying to convert EVAL into EVALSHA specifically for slaves.
2021  * 3) Every time we trasmit a script as EVAL to the slaves, we also add the
2022  *    corresponding SHA1 of the script into the cache as we are sure every
2023  *    slave knows about the script starting from now.
2024  * 4) On SCRIPT FLUSH command, we replicate the command to all the slaves
2025  *    and at the same time flush the script cache.
2026  * 5) When the last slave disconnects, flush the cache.
2027  * 6) We handle SCRIPT LOAD as well since that's how scripts are loaded
2028  *    in the master sometimes.
2029  */
2030 
2031 /* Initialize the script cache, only called at startup. */
replicationScriptCacheInit(void)2032 void replicationScriptCacheInit(void) {
2033     server.repl_scriptcache_size = 10000;
2034     server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
2035     server.repl_scriptcache_fifo = listCreate();
2036 }
2037 
2038 /* Empty the script cache. Should be called every time we are no longer sure
2039  * that every slave knows about all the scripts in our set, or when the
2040  * current AOF "context" is no longer aware of the script. In general we
2041  * should flush the cache:
2042  *
2043  * 1) Every time a new slave reconnects to this master and performs a
2044  *    full SYNC (PSYNC does not require flushing).
2045  * 2) Every time an AOF rewrite is performed.
2046  * 3) Every time we are left without slaves at all, and AOF is off, in order
2047  *    to reclaim otherwise unused memory.
2048  */
replicationScriptCacheFlush(void)2049 void replicationScriptCacheFlush(void) {
2050     dictEmpty(server.repl_scriptcache_dict,NULL);
2051     listRelease(server.repl_scriptcache_fifo);
2052     server.repl_scriptcache_fifo = listCreate();
2053 }
2054 
2055 /* Add an entry into the script cache, if we reach max number of entries the
2056  * oldest is removed from the list. */
replicationScriptCacheAdd(sds sha1)2057 void replicationScriptCacheAdd(sds sha1) {
2058     int retval;
2059     sds key = sdsdup(sha1);
2060 
2061     /* Evict oldest. */
2062     if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
2063     {
2064         listNode *ln = listLast(server.repl_scriptcache_fifo);
2065         sds oldest = listNodeValue(ln);
2066 
2067         retval = dictDelete(server.repl_scriptcache_dict,oldest);
2068         serverAssert(retval == DICT_OK);
2069         listDelNode(server.repl_scriptcache_fifo,ln);
2070     }
2071 
2072     /* Add current. */
2073     retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
2074     listAddNodeHead(server.repl_scriptcache_fifo,key);
2075     serverAssert(retval == DICT_OK);
2076 }
2077 
2078 /* Returns non-zero if the specified entry exists inside the cache, that is,
2079  * if all the slaves are aware of this script SHA1. */
replicationScriptCacheExists(sds sha1)2080 int replicationScriptCacheExists(sds sha1) {
2081     return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
2082 }
2083 
2084 /* ----------------------- SYNCHRONOUS REPLICATION --------------------------
2085  * Redis synchronous replication design can be summarized in points:
2086  *
2087  * - Redis masters have a global replication offset, used by PSYNC.
2088  * - Master increment the offset every time new commands are sent to slaves.
2089  * - Slaves ping back masters with the offset processed so far.
2090  *
2091  * So synchronous replication adds a new WAIT command in the form:
2092  *
2093  *   WAIT <num_replicas> <milliseconds_timeout>
2094  *
2095  * That returns the number of replicas that processed the query when
2096  * we finally have at least num_replicas, or when the timeout was
2097  * reached.
2098  *
2099  * The command is implemented in this way:
2100  *
2101  * - Every time a client processes a command, we remember the replication
2102  *   offset after sending that command to the slaves.
2103  * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
2104  *   The client is blocked at the same time (see blocked.c).
2105  * - Once we receive enough ACKs for a given offset or when the timeout
2106  *   is reached, the WAIT command is unblocked and the reply sent to the
2107  *   client.
2108  */
2109 
2110 /* This just set a flag so that we broadcast a REPLCONF GETACK command
2111  * to all the slaves in the beforeSleep() function. Note that this way
2112  * we "group" all the clients that want to wait for synchronouns replication
2113  * in a given event loop iteration, and send a single GETACK for them all. */
replicationRequestAckFromSlaves(void)2114 void replicationRequestAckFromSlaves(void) {
2115     server.get_ack_from_slaves = 1;
2116 }
2117 
2118 /* Return the number of slaves that already acknowledged the specified
2119  * replication offset. */
replicationCountAcksByOffset(long long offset)2120 int replicationCountAcksByOffset(long long offset) {
2121     listIter li;
2122     listNode *ln;
2123     int count = 0;
2124 
2125     listRewind(server.slaves,&li);
2126     while((ln = listNext(&li))) {
2127         client *slave = ln->value;
2128 
2129         if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2130         if (slave->repl_ack_off >= offset) count++;
2131     }
2132     return count;
2133 }
2134 
2135 /* WAIT for N replicas to acknowledge the processing of our latest
2136  * write command (and all the previous commands). */
waitCommand(client * c)2137 void waitCommand(client *c) {
2138     mstime_t timeout;
2139     long numreplicas, ackreplicas;
2140     long long offset = c->woff;
2141 
2142     /* Argument parsing. */
2143     if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
2144         return;
2145     if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
2146         != C_OK) return;
2147 
2148     /* First try without blocking at all. */
2149     ackreplicas = replicationCountAcksByOffset(c->woff);
2150     if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
2151         addReplyLongLong(c,ackreplicas);
2152         return;
2153     }
2154 
2155     /* Otherwise block the client and put it into our list of clients
2156      * waiting for ack from slaves. */
2157     c->bpop.timeout = timeout;
2158     c->bpop.reploffset = offset;
2159     c->bpop.numreplicas = numreplicas;
2160     listAddNodeTail(server.clients_waiting_acks,c);
2161     blockClient(c,BLOCKED_WAIT);
2162 
2163     /* Make sure that the server will send an ACK request to all the slaves
2164      * before returning to the event loop. */
2165     replicationRequestAckFromSlaves();
2166 }
2167 
2168 /* This is called by unblockClient() to perform the blocking op type
2169  * specific cleanup. We just remove the client from the list of clients
2170  * waiting for replica acks. Never call it directly, call unblockClient()
2171  * instead. */
unblockClientWaitingReplicas(client * c)2172 void unblockClientWaitingReplicas(client *c) {
2173     listNode *ln = listSearchKey(server.clients_waiting_acks,c);
2174     serverAssert(ln != NULL);
2175     listDelNode(server.clients_waiting_acks,ln);
2176 }
2177 
2178 /* Check if there are clients blocked in WAIT that can be unblocked since
2179  * we received enough ACKs from slaves. */
processClientsWaitingReplicas(void)2180 void processClientsWaitingReplicas(void) {
2181     long long last_offset = 0;
2182     int last_numreplicas = 0;
2183 
2184     listIter li;
2185     listNode *ln;
2186 
2187     listRewind(server.clients_waiting_acks,&li);
2188     while((ln = listNext(&li))) {
2189         client *c = ln->value;
2190 
2191         /* Every time we find a client that is satisfied for a given
2192          * offset and number of replicas, we remember it so the next client
2193          * may be unblocked without calling replicationCountAcksByOffset()
2194          * if the requested offset / replicas were equal or less. */
2195         if (last_offset && last_offset > c->bpop.reploffset &&
2196                            last_numreplicas > c->bpop.numreplicas)
2197         {
2198             unblockClient(c);
2199             addReplyLongLong(c,last_numreplicas);
2200         } else {
2201             int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);
2202 
2203             if (numreplicas >= c->bpop.numreplicas) {
2204                 last_offset = c->bpop.reploffset;
2205                 last_numreplicas = numreplicas;
2206                 unblockClient(c);
2207                 addReplyLongLong(c,numreplicas);
2208             }
2209         }
2210     }
2211 }
2212 
2213 /* Return the slave replication offset for this instance, that is
2214  * the offset for which we already processed the master replication stream. */
replicationGetSlaveOffset(void)2215 long long replicationGetSlaveOffset(void) {
2216     long long offset = 0;
2217 
2218     if (server.masterhost != NULL) {
2219         if (server.master) {
2220             offset = server.master->reploff;
2221         } else if (server.cached_master) {
2222             offset = server.cached_master->reploff;
2223         }
2224     }
2225     /* offset may be -1 when the master does not support it at all, however
2226      * this function is designed to return an offset that can express the
2227      * amount of data processed by the master, so we return a positive
2228      * integer. */
2229     if (offset < 0) offset = 0;
2230     return offset;
2231 }
2232 
2233 /* --------------------------- REPLICATION CRON  ---------------------------- */
2234 
2235 /* Replication cron function, called 1 time per second. */
replicationCron(void)2236 void replicationCron(void) {
2237     static long long replication_cron_loops = 0;
2238 
2239     /* Non blocking connection timeout? */
2240     if (server.masterhost &&
2241         (server.repl_state == REPL_STATE_CONNECTING ||
2242          slaveIsInHandshakeState()) &&
2243          (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
2244     {
2245         serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
2246         cancelReplicationHandshake();
2247     }
2248 
2249     /* Bulk transfer I/O timeout? */
2250     if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
2251         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
2252     {
2253         serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
2254         cancelReplicationHandshake();
2255     }
2256 
2257     /* Timed out master when we are an already connected slave? */
2258     if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
2259         (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
2260     {
2261         serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
2262         freeClient(server.master);
2263     }
2264 
2265     /* Check if we should connect to a MASTER */
2266     if (server.repl_state == REPL_STATE_CONNECT) {
2267         serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
2268             server.masterhost, server.masterport);
2269         if (connectWithMaster() == C_OK) {
2270             serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
2271         }
2272     }
2273 
2274     /* Send ACK to master from time to time.
2275      * Note that we do not send periodic acks to masters that don't
2276      * support PSYNC and replication offsets. */
2277     if (server.masterhost && server.master &&
2278         !(server.master->flags & CLIENT_PRE_PSYNC))
2279         replicationSendAck();
2280 
2281     /* If we have attached slaves, PING them from time to time.
2282      * So slaves can implement an explicit timeout to masters, and will
2283      * be able to detect a link disconnection even if the TCP connection
2284      * will not actually go down. */
2285     listIter li;
2286     listNode *ln;
2287     robj *ping_argv[1];
2288 
2289     /* First, send PING according to ping_slave_period. */
2290     if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
2291         ping_argv[0] = createStringObject("PING",4);
2292         replicationFeedSlaves(server.slaves, server.slaveseldb,
2293             ping_argv, 1);
2294         decrRefCount(ping_argv[0]);
2295     }
2296 
2297     /* Second, send a newline to all the slaves in pre-synchronization
2298      * stage, that is, slaves waiting for the master to create the RDB file.
2299      * The newline will be ignored by the slave but will refresh the
2300      * last-io timer preventing a timeout. In this case we ignore the
2301      * ping period and refresh the connection once per second since certain
2302      * timeouts are set at a few seconds (example: PSYNC response). */
2303     listRewind(server.slaves,&li);
2304     while((ln = listNext(&li))) {
2305         client *slave = ln->value;
2306 
2307         if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
2308             (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
2309              server.rdb_child_type != RDB_CHILD_TYPE_SOCKET))
2310         {
2311             if (write(slave->fd, "\n", 1) == -1) {
2312                 /* Don't worry, it's just a ping. */
2313             }
2314         }
2315     }
2316 
2317     /* Disconnect timedout slaves. */
2318     if (listLength(server.slaves)) {
2319         listIter li;
2320         listNode *ln;
2321 
2322         listRewind(server.slaves,&li);
2323         while((ln = listNext(&li))) {
2324             client *slave = ln->value;
2325 
2326             if (slave->replstate != SLAVE_STATE_ONLINE) continue;
2327             if (slave->flags & CLIENT_PRE_PSYNC) continue;
2328             if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
2329             {
2330                 serverLog(LL_WARNING, "Disconnecting timedout slave: %s",
2331                     replicationGetSlaveName(slave));
2332                 freeClient(slave);
2333             }
2334         }
2335     }
2336 
2337     /* If we have no attached slaves and there is a replication backlog
2338      * using memory, free it after some (configured) time. */
2339     if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
2340         server.repl_backlog)
2341     {
2342         time_t idle = server.unixtime - server.repl_no_slaves_since;
2343 
2344         if (idle > server.repl_backlog_time_limit) {
2345             freeReplicationBacklog();
2346             serverLog(LL_NOTICE,
2347                 "Replication backlog freed after %d seconds "
2348                 "without connected slaves.",
2349                 (int) server.repl_backlog_time_limit);
2350         }
2351     }
2352 
2353     /* If AOF is disabled and we no longer have attached slaves, we can
2354      * free our Replication Script Cache as there is no need to propagate
2355      * EVALSHA at all. */
2356     if (listLength(server.slaves) == 0 &&
2357         server.aof_state == AOF_OFF &&
2358         listLength(server.repl_scriptcache_fifo) != 0)
2359     {
2360         replicationScriptCacheFlush();
2361     }
2362 
2363     /* Start a BGSAVE good for replication if we have slaves in
2364      * WAIT_BGSAVE_START state.
2365      *
2366      * In case of diskless replication, we make sure to wait the specified
2367      * number of seconds (according to configuration) so that other slaves
2368      * have the time to arrive before we start streaming. */
2369     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
2370         time_t idle, max_idle = 0;
2371         int slaves_waiting = 0;
2372         int mincapa = -1;
2373         listNode *ln;
2374         listIter li;
2375 
2376         listRewind(server.slaves,&li);
2377         while((ln = listNext(&li))) {
2378             client *slave = ln->value;
2379             if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
2380                 idle = server.unixtime - slave->lastinteraction;
2381                 if (idle > max_idle) max_idle = idle;
2382                 slaves_waiting++;
2383                 mincapa = (mincapa == -1) ? slave->slave_capa :
2384                                             (mincapa & slave->slave_capa);
2385             }
2386         }
2387 
2388         if (slaves_waiting &&
2389             (!server.repl_diskless_sync ||
2390              max_idle > server.repl_diskless_sync_delay))
2391         {
2392             /* Start the BGSAVE. The called function may start a
2393              * BGSAVE with socket target or disk target depending on the
2394              * configuration and slaves capabilities. */
2395             startBgsaveForReplication(mincapa);
2396         }
2397     }
2398 
2399     /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
2400     refreshGoodSlavesCount();
2401     replication_cron_loops++; /* Incremented with frequency 1 HZ. */
2402 }
2403